# Check MinIO

In [1]:
!pip install minio delta-spark==2.2.0

[0mCollecting minio
  Downloading minio-7.1.16-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.9/77.9 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting delta-spark==2.2.0
  Downloading delta_spark-2.2.0-py3-none-any.whl (20 kB)
Collecting pyspark<3.4.0,>=3.3.0
  Downloading pyspark-3.3.3.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m37.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m43.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.3-py2.py3-none-any.whl size=281891025 sha256=54c52038e330745cacf59628807

In [2]:
from minio import Minio

In [3]:
client = Minio(
    "minio:9000",
    access_key="minio",
    secret_key="minio123",
    secure=False
)

bucket = "warehouse"
if client.bucket_exists(bucket):
    print(f"{bucket} exists")


warehouse exists


# Init SparkContext

In [4]:
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

In [5]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [6]:
sc = spark.sparkContext
sc

In [7]:
# test Delta Lake
spark.range(500).write.format("delta").save("s3a://warehouse/deltafile", mode="overwrite")

# Create RDDs

## By loading dataset

In [None]:
fdd = sc.textFile("s3a://warehouse/testfile.txt")
fdd

In [None]:
fdd.getNumPartitions()

In [None]:
fdd.count()

In [None]:
fdd = fdd.repartition(10)
fdd.getNumPartitions()

In [None]:
fdd.count()

In [None]:
fdd.collect()

In [None]:
# split lines into words
words = fdd.flatMap(lambda line: line.split())

# count the occurrences of each word
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.collect()

In [None]:
# filter and split lines into words
words_2 = fdd.filter(lambda x: "There are" in x).flatMap(lambda line: line.split())

# count the occurrences of each word
word_counts_2 = words_2.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts_2.collect()

In [None]:
word_counts.join(word_counts_2).collect()

In [None]:
# create two RDDs with key-value pairs
rdd1 = sc.parallelize([(1, "hello"), (2, "world"), (3, "foo")], 1)
rdd2 = sc.parallelize([(1, "bar"), (2, "baz"), (4, "qux")], 1)

# perform a join on the two RDDs
rdd_joined = rdd1.join(rdd2)

# print the result
rdd_joined.collect()

## By using parallelize

In [None]:
data = [1, 2, 3, 3]
rdd = sc.parallelize(data, 2)
rdd

In [None]:
rdd.getNumPartitions()

In [None]:
rdd.collect()

# RDD operations

## Transformation

* Element-wise transformations
* Transformation filter(): Takes in a function and returns an RDD that only has elements that pass the filter( ) function

In [None]:
rdd.filter(lambda x: x != 1).collect()

* Element-wise transformations
* Transformation map(): Takes in a function and applies it to each element  in the RDD with the result of the function being the new value of each element in the resulting RDD

In [None]:
rdd.map(lambda x: x + 1).collect()

In [None]:
rdd.map(lambda x: [x, x + 5]).collect()

In [None]:
rdd.flatMap(lambda x: [x, x + 5]).collect()

* Sampling Transformation
* sample() an RDD: We can specify with or without replacement, or the fraction

In [None]:
rdd.sample(False, 0.5).collect()

* Pseudo Set Operations
* RDDs support many operations of mathematical sets: 
* distinct, union, intersect, subtract
* All expensive except union because they involve shuffling

In [None]:
rdd = sc.parallelize([1, 2, 3])
other = sc.parallelize([3, 4, 5])

In [None]:
rdd.union(other).collect()

In [None]:
rdd.intersection(other).collect()

In [None]:
rdd.subtract(other).collect()

In [None]:
rdd.cartesian(other).collect()

## Actions

* fold( )
* takes a function similarly as reduce( ) does, but takes a “zero value” to be used for initial call on each partition
* should be the identity element for the operation
* 0 for +, 1 for *, etc.
* return type the same as RDD elements 


In [None]:
rdd = sc.parallelize([1, 2, 3, 3])
rdd.fold(0, lambda a, b: a + b)

* reduce( )
* takes a function that operates on two elements of the type in the RDD and returns a new element of the same type
* should be commutative and associative so that it can be computed correctly in parallel

In [None]:
rdd.reduce(lambda a, b: a + b)

aggregate( )
* we can also supply an initial zero value of the type we want to return
* a 1st function to combine the elements from RDD with the accumulator
* a 2nd function to merge two accumulators given that each node accumulates its own results locally

In [None]:
seq_op = (lambda acc, value: (acc[0] + value, acc[1] + 1))
comb_op = (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

sumCount = rdd.aggregate((0, 0), seq_op, comb_op)
sumCount[0], sumCount[1], sumCount[0]/float(sumCount[1])

collect( )
* return the entire RDD’s contents to the driver program

take( )
* returns n elements from the RDD
* attempts to minimize the number of partitions it accesses, so may be biased

top(n)
* return the top n elements of the RDD

count( )
* returns the number of elements in the RDD

In [None]:
rdd.collect()

In [None]:
rdd.take(2)

In [None]:
rdd.takeOrdered(2, key=lambda x: -x)

In [None]:
rdd.takeSample(False, 1)

In [None]:
rdd.top(2)

In [None]:
rdd.count()

In [None]:
rdd.countByValue()

# Caching RDDs

In [None]:
lines = sc.textFile("s3a://warehouse/testfile.txt", 4)
lines

Count will cause Spark to
* read data
* sum within partitions
* combine sums in driver

In [None]:
lines.count()

In [None]:
paragraphs = lines.filter(lambda x: len(x) > 0)
paragraphs

Count will cause Spark to
* read data (again)
* sum within partitions
* combine sums in driver

In [None]:
paragraphs.count()

In [None]:
lines = sc.textFile("s3a://warehouse/testfile.txt", 4)

# save, don't compute
lines.cache()

In [None]:
paragraphs = lines.filter(lambda x: len(x) > 0)
print(lines.count())

In [None]:
print(paragraphs.count())

In [None]:
# remove from cache
lines.unpersist()