In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/24 00:54:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/24 00:54:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/24 00:54:55 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
retailAll = '/tmp/data/retail-data/all/'
flightData2010 = '/tmp/data/flight-data/parquet/2010-summary.parquet'

### Resilient Distributed Datasets (RDDs)

-  **Spak operates on a per-partition basis when executing code**
-  Basically all DF Spark code compiles down to an RDD
-  When calling a DF transformation, the underlying logic becomes a set of RDD transformations
-  SparkContext is the entry point for low-level API functionality accessed through SparkSession
-  Main reason to use RDDs is for fine grained control over physical distribution of data (**custom partitioning of data**)
-  RDD performance is best via Scala/Java
<br>
-  RDDs:
    -  Caching:
        -  ability to cache or persists an RDD
        -  ability to specify a storage level [org.apache.spark.storage.StorageLevel] (combinations of memory only, disk only, and off heap)
    -  Checkpointing:
        -  saves RDD to risk so future computations on that RDD point to its partitions on disk rather than recomputing the RDD from the original source
        -  similar to caching except checkpointing is stored only on disk and not in memory (like cache)
        -  when checkpointed RDD is referenced it derives from checkpoint instead of source data, which helps improve performance and optimization
        <br>
-  Shared Variables:
    -  broadcast variables
    -  accumulators

### _RDD to DF Example_

In [4]:
rdd1 = spark.range(3).rdd
rdd1.collect()

[Row(id=0), Row(id=1), Row(id=2)]

In [5]:
for i in spark.range(3).rdd.collect(): print(i)

Row(id=0)
Row(id=1)
Row(id=2)


In [6]:
spark.range(3).rdd.toDF().show()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



### _Local Collection to RDD Example_

In [7]:
myCollection = "Training on Python, Spark, Scala and on ML".split(" ")


In [8]:
myCollection

['Training', 'on', 'Python,', 'Spark,', 'Scala', 'and', 'on', 'ML']

In [9]:
words = spark.sparkContext.parallelize(myCollection, 2) # sets number of partitions

In [10]:
words.getNumPartitions()

2

In [11]:
words.setName("myWords") # names app for Spark UI
for i in words.collect(): print(i)

Training
on
Python,
Spark,
Scala
and
on
ML


### _RDD Data Source Read Example_

### RDD Transformation Examples:

In [12]:
# distinct
print(words.distinct().count())

[Stage 8:>                                                          (0 + 2) / 2]

7


                                                                                

In [13]:
print(words.collect())
print(words.distinct().collect())

['Training', 'on', 'Python,', 'Spark,', 'Scala', 'and', 'on', 'ML']
['Spark,', 'Training', 'on', 'Python,', 'Scala', 'and', 'ML']


In [14]:
# filter
def startsWithS(individual):
  return individual.startswith("S")

print(words.filter(lambda word: startsWithS(word)).collect())


['Spark,', 'Scala']


In [15]:
# map

words2 = words.map(
    lambda word: 
        (
            word, word[0], word.startswith("S")
        )
)

In [16]:
words2.collect()

[('Training', 'T', False),
 ('on', 'o', False),
 ('Python,', 'P', False),
 ('Spark,', 'S', True),
 ('Scala', 'S', True),
 ('and', 'a', False),
 ('on', 'o', False),
 ('ML', 'M', False)]

In [17]:
print(words2.filter(
    lambda record: record[2]
).collect())

[('Spark,', 'S', True), ('Scala', 'S', True)]


In [18]:
# sort
print(words.sortBy(lambda word: len(word) * -1).collect())

['Training', 'Python,', 'Spark,', 'Scala', 'and', 'on', 'on', 'ML']


### RDD Actions Examples

In [19]:
# reduce
print(spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y))

210


In [20]:
words.collect()

['Training', 'on', 'Python,', 'Spark,', 'Scala', 'and', 'on', 'ML']

In [21]:
# countByValue
print(words.countByValue())

defaultdict(<class 'int'>, {'Training': 1, 'on': 2, 'Python,': 1, 'Spark,': 1, 'Scala': 1, 'and': 1, 'ML': 1})


In [22]:
# first
print(words.first())

Training


In [23]:
# take
print(words.take(3)) # returns values
print(words.takeOrdered(3)) # asc order

['Training', 'on', 'Python,']
['ML', 'Python,', 'Scala']


### _RDD TXT Save (Uncompressed & Compressed) Example_

In [24]:
import shutil
#shutil.rmtree("words_atin")

words.saveAsTextFile("words_atin")

In [25]:
import os
for i in os.listdir("words_atin"): print(i)

part-00001
part-00000
_SUCCESS
._SUCCESS.crc
.part-00001.crc
.part-00000.crc


In [26]:
import shutil
#shutil.rmtree("wordsCompressed_atin")

words.saveAsTextFile("wordsCompressed_atin", \
                     compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

In [27]:
import os
for i in os.listdir("wordsCompressed_atin/"): print(i)

.part-00000.gz.crc
part-00001.gz
_SUCCESS
._SUCCESS.crc
.part-00001.gz.crc
part-00000.gz


## Distributed Shared Variables_

-  Broadcast Variables:
    -  saves large value on all worker nodes without re-sending to cluster every time (ex: lookup table as function that fits in memory on each executor)
    -  avoids deserialization per task on the worker nodes every time variable is used
    -  shared immutable variables that are cached on every machine in cluster instead of serialized with every single task
    -  the cost of serializing data for every task can be quite expensive thus broadcast variables are a good alternative   
<br>
-  Accumulators:
    -  adds data together from all tasks into a shared result (ex: error logging counter and debugging)
    -  mutable variable that updates value via transformations and sends value to driver node in an efficient manner


In [4]:
my_collection = "Corporate Training for - Spark, Scala, Python".split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

### _Broadcast Example_:

In [29]:
supplementalData = {"ATT":1000, "Corporate":200,
                    "Training":-300, "days":100}

In [30]:
type(supplementalData)

dict

In [31]:
suppBroadcast = spark.sparkContext.broadcast(supplementalData)

In [32]:
suppBroadcast.value

{'ATT': 1000, 'Corporate': 200, 'Training': -300, 'days': 100}

In [33]:
type(suppBroadcast)

pyspark.broadcast.Broadcast

In [34]:
words.collect()

['Training', 'on', 'Python,', 'Spark,', 'Scala', 'and', 'on', 'ML']

In [35]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0))).collect()

[('Training', -300),
 ('on', 0),
 ('Python,', 0),
 ('Spark,', 0),
 ('Scala', 0),
 ('and', 0),
 ('on', 0),
 ('ML', 0)]

In [36]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0))).collect()

[('Training', -300),
 ('on', 0),
 ('Python,', 0),
 ('Spark,', 0),
 ('Scala', 0),
 ('and', 0),
 ('on', 0),
 ('ML', 0)]

### _Accumulator Example_:

In [37]:
print(flightData2010)

/tmp/data/flight-data/parquet/2010-summary.parquet


In [38]:
flights = spark.read.parquet(flightData2010)
flights.show()

                                                                                

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [39]:
# count number of flights to or from China
accChina = spark.sparkContext.accumulator(0)

def accChinaFunc(flight_row):
  destination = flight_row["DEST_COUNTRY_NAME"]
  origin = flight_row["ORIGIN_COUNTRY_NAME"]
  if destination == "China":
    accChina.add(flight_row["count"])
  if origin == "China":
    accChina.add(flight_row["count"])

# runs foreach row in the input DF (flights) and runs function against each row
flights.foreach(lambda flight_row: accChinaFunc(flight_row))

In [40]:
accChina.value

953

In [41]:
myCollection = "Python Spark PySpark DataBricks Advanced Training"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

In [42]:
words.map(lambda word: (word.lower(), 1)).collect()

[('python', 1),
 ('spark', 1),
 ('pyspark', 1),
 ('databricks', 1),
 ('advanced', 1),
 ('training', 1)]

In [43]:
keyword = words.keyBy(lambda word: word.lower()[0])
keyword.collect()

[('p', 'Python'),
 ('s', 'Spark'),
 ('p', 'PySpark'),
 ('d', 'DataBricks'),
 ('a', 'Advanced'),
 ('t', 'Training')]

In [44]:
keyword.mapValues(lambda word: word.upper()).collect()

[('p', 'PYTHON'),
 ('s', 'SPARK'),
 ('p', 'PYSPARK'),
 ('d', 'DATABRICKS'),
 ('a', 'ADVANCED'),
 ('t', 'TRAINING')]