In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [3]:
retailAll = '../data/retail-data/all/'
flightData2010 = '../data/flight-data/parquet/2010-summary.parquet'

### Resilient Distributed Datasets (RDDs)

-  **Spak operates on a per-partition basis when executing code**
-  Basically all DF Spark code compiles down to an RDD
-  When calling a DF transformation, the underlying logic becomes a set of RDD transformations
-  SparkContext is the entry point for low-level API functionality accessed through SparkSession
-  Main reason to use RDDs is for fine grained control over physical distribution of data (**custom partitioning of data**)
-  RDD performance is best via Scala/Java
<br>
-  RDDs:
    -  Caching:
        -  ability to cache or persists an RDD
        -  ability to specify a storage level [org.apache.spark.storage.StorageLevel] (combinations of memory only, disk only, and off heap)
    -  Checkpointing:
        -  saves RDD to risk so future computations on that RDD point to its partitions on disk rather than recomputing the RDD from the original source
        -  similar to caching except checkpointing is stored only on disk and not in memory (like cache)
        -  when checkpointed RDD is referenced it derives from checkpoint instead of source data, which helps improve performance and optimization
        <br>
-  Shared Variables:
    -  broadcast variables
    -  accumulators

### _RDD to DF Example_

In [4]:
rdd1 = spark.range(3).rdd
rdd1.collect()

[Row(id=0), Row(id=1), Row(id=2)]

In [3]:
for i in spark.range(3).rdd.collect(): print(i)

Row(id=0)
Row(id=1)
Row(id=2)


In [4]:
spark.range(3).rdd.toDF().show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



### _Local Collection to RDD Example_

In [5]:
myCollection = "Training on Python, Spark, Scala, ML".split(" ")


In [6]:
myCollection

['Training', 'on', 'Python,', 'Spark,', 'Scala,', 'ML']

In [7]:
words = spark.sparkContext.parallelize(myCollection, 2) # sets number of partitions

In [8]:
words.getNumPartitions()

2

In [8]:
words.setName("myWords") # names app for Spark UI
for i in words.collect(): print(i)

Training
on
Python,
Spark,
Scala,
ML


### _RDD Data Source Read Example_

### RDD Transformation Examples:

In [11]:
# distinct
print(words.distinct().count())

6


In [13]:
words.collect()

['Training', 'on', 'Python,', 'Spark,', 'Scala,', 'ML']

In [14]:
# filter
def startsWithS(individual):
  return individual.startswith("S")

print(words.filter(lambda word: startsWithS(word)).collect())


['Spark,', 'Scala,']


In [24]:
# map

words2 = words.map(
    lambda word: 
        (
            word, word[0], word.startswith("S")
        )
)

In [26]:
words2.collect()

[('Training', 'T', False),
 ('on', 'o', False),
 ('Python,', 'P', False),
 ('Spark,', 'S', True),
 ('Scala,', 'S', True),
 ('ML', 'M', False)]

In [29]:
print(words2.filter(
    lambda record: record[2]
).collect())

[('Spark,', 'S', True), ('Scala,', 'S', True)]


In [14]:
# sort
print(words.sortBy(lambda word: len(word) * -1).collect())

['Training', 'Python,', 'Spark,', 'Scala,', 'on', 'ML']


In [43]:
# randomSplit
fiftyFiftySplit = words.randomSplit([0.5, 0.5], seed=1)

print(fiftyFiftySplit[0].collect())
print(fiftyFiftySplit[1].collect())

['Training', 'Python,', 'Scala,']
['on', 'Spark,', 'ML']


### RDD Actions Examples

In [44]:
# reduce
print(spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y))

210


In [16]:
words.collect()

['Training', 'on', 'Python,', 'Spark,', 'Scala,', 'ML']

In [18]:
def wordLengthReducer(leftWord, rightWord):
  print("wordLengthReducer: ", leftWord, leftWord)
  if len(leftWord) < len(rightWord):
    print("<", leftWord, rightWord)
    return leftWord
  else:
    print(">=", leftWord, rightWord)
    return rightWord

print(words.reduce(wordLengthReducer))


wordLengthReducer:  on on
>= on ML
ML


In [49]:
# count
print(words.count())

# countApprox
confidence = 0.95
timeoutMilliseconds = 400

# Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.
print(words.countApprox(timeoutMilliseconds, confidence))

6
6


In [50]:
# countByValue
print(words.countByValue())

defaultdict(<class 'int'>, {'Training': 1, 'on': 1, 'Python,': 1, 'Spark,': 1, 'Scala,': 1, 'ML': 1})


In [51]:
# first
print(words.first())

Training


In [18]:
# take
print(words.take(3)) # returns values
print(words.takeOrdered(3)) # asc order

['Training', 'on', 'Python,']
['ML', 'Python,', 'Scala,']


In [22]:
withReplacement = True
numberToTake = 2
randomSeed = 100

# The parameter withReplacement controls the Uniqueness of sample result
# If we treat a Dataset as a bucket of balls, withReplacement=true means, taking a random ball out of the bucket and place it back into it
# That means, the same ball can be picked up again.

print(words.takeSample(withReplacement, numberToTake)) # random sample from RDD

['Spark,', 'on']


### _RDD TXT Save (Uncompressed & Compressed) Example_

In [30]:
import shutil
shutil.rmtree("/tmp/words_atin")

words.saveAsTextFile("/tmp/words_atin")

In [31]:
import os
for i in os.listdir("/tmp/words_atin"): print(i)

part-00001
part-00000
_SUCCESS
.part-00001.crc
.part-00000.crc
._SUCCESS.crc


In [32]:
!head /tmp/words_atin/part-00000
print("\n")
!head /tmp/words_atin/part-00001

Training
on
Python,


Spark,
Scala,
ML


In [34]:
import shutil
#shutil.rmtree("/tmp/wordsCompressed_atin")

words.saveAsTextFile("/tmp/wordsCompressed_atin", \
                     compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

In [35]:
import os
for i in os.listdir("/tmp/wordsCompressed_atin/"): print(i)

part-00000.gz
_SUCCESS
.part-00001.gz.crc
.part-00000.gz.crc
part-00001.gz
._SUCCESS.crc


### _RDD Caching Example_

In [66]:
words.cache()

myWords ParallelCollectionRDD[20] at readRDDFromFile at PythonRDD.scala:247

### _RDD Checkpointing Example_

In [90]:
#shutil.rmtree("/tmp/checkpointRDD")
spark.sparkContext.setCheckpointDir("/tmp/checkpointRDD")
words.checkpoint()

## Distributed Shared Variables_

-  Broadcast Variables:
    -  saves large value on all worker nodes without re-sending to cluster every time (ex: lookup table as function that fits in memory on each executor)
    -  avoids deserialization per task on the worker nodes every time variable is used
    -  shared immutable variables that are cached on every machine in cluster instead of serialized with every single task
    -  the cost of serializing data for every task can be quite expensive thus broadcast variables are a good alternative   
<br>
-  Accumulators:
    -  adds data together from all tasks into a shared result (ex: error logging counter and debugging)
    -  mutable variable that updates value via transformations and sends value to driver node in an efficient manner


In [92]:
my_collection = "Corporate Training for - Spark, Scala, Python".split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

### _Broadcast Example_:

In [40]:
supplementalData = {"ATT":1000, "Corporate":200,
                    "Training":-300, "days":100}

In [41]:
suppBroadcast = spark.sparkContext.broadcast(supplementalData)

In [42]:
suppBroadcast.value

{'ATT': 1000, 'Corporate': 200, 'Training': -300, 'days': 100}

In [47]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0))).sortBy(lambda wordPair: wordPair[1]).collect()

[('Training', -300),
 ('on', 0),
 ('Python,', 0),
 ('Spark,', 0),
 ('Scala,', 0),
 ('ML', 0)]

### _Accumulator Example_:

In [48]:
print(flightData2010)

../data/flight-data/parquet/2010-summary.parquet


In [49]:
flights = spark.read.parquet(flightData2010)
flights.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [50]:
# count number of flights to or from China
accChina = spark.sparkContext.accumulator(0)

def accChinaFunc(flight_row):
  destination = flight_row["DEST_COUNTRY_NAME"]
  origin = flight_row["ORIGIN_COUNTRY_NAME"]
  if destination == "China":
    accChina.add(flight_row["count"])
  if origin == "China":
    accChina.add(flight_row["count"])

# runs foreach row in the input DF (flights) and runs function against each row
flights.foreach(lambda flight_row: accChinaFunc(flight_row))

In [39]:
accChina.value

953

In [1]:
myCollection = "Python Spark PySpark DataBricks Advanced Training"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

NameError: name 'spark' is not defined

In [None]:
words.map(lambda word: (word.lower(), 1))

In [None]:
keyword = words.keyBy(lambda word: word.lower()[0])

In [None]:
keyword.mapValues(lambda word: word.upper()).collect()

In [None]:
keyword.flatMapValues(lambda word: word.upper()).collect()

In [None]:
keyword.keys().collect()
keyword.values().collect()

In [None]:
import random
distinctChars = words.flatMap(lambda word: list(word.lower())).distinct()\
  .collect()
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
words.map(lambda word: (word.lower()[0], word))\
  .sampleByKey(True, sampleMap, 6).collect()

In [None]:
chars = words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
def maxFunc(left, right):
  return max(left, right)
def addFunc(left, right):
  return left + right
nums = sc.parallelize(range(1,31), 5)

In [None]:
KVcharacters.countByKey()

In [None]:
KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1])))\
  .collect()

In [None]:
nums.aggregate(0, maxFunc, addFunc)

In [None]:
depth = 3
nums.treeAggregate(0, maxFunc, addFunc, depth)

In [None]:
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

In [None]:
def valToCombiner(value):
  return [value]
def mergeValuesFunc(vals, valToAppend):
  vals.append(valToAppend)
  return vals
def mergeCombinerFunc(vals1, vals2):
  return vals1 + vals2
outputPartitions = 6
KVcharacters\
  .combineByKey(
    valToCombiner,
    mergeValuesFunc,
    mergeCombinerFunc,
    outputPartitions)\
  .collect()

In [None]:
KVcharacters.foldByKey(0, addFunc).collect()

In [None]:
import random
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
charRDD = distinctChars.map(lambda c: (c, random.random()))
charRDD2 = distinctChars.map(lambda c: (c, random.random()))
charRDD.cogroup(charRDD2).take(5)

In [None]:
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()

In [None]:
numRange = sc.parallelize(range(10), 2)
words.zip(numRange).collect()

In [None]:
words.coalesce(1).getNumPartitions() # 1

In [None]:
df = spark.read.option("header", "true").option("inferSchema", "true")\
  .csv("/data/retail-data/all/")
rdd = df.coalesce(10).rdd

In [None]:
def partitionFunc(key):
  import random
  if key == 17850 or key == 12583:
    return 0
  else:
    return random.randint(1,2)

keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
  .partitionBy(3, partitionFunc)\
  .map(lambda x: x[0])\
  .glom()\
  .map(lambda x: len(set(x)))\
  .take(5)