#### Launch spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import StorageLevel
from pyspark.sql.types import *

from datetime import datetime, date
import pandas as pd
import numpy as np
import re
from bs4 import BeautifulSoup
from IPython.display import display
from tqdm import tqdm
import time

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

# spark = SparkSession.builder \
#     .master("spark://spark-master:7077") \
#     .config("spark.executor.instances", 4) \
#     .config("spark.executor.memory", "2g") \
#     .config("spark.executor.cores", "2") \
#     .config("spark.driver.cores", "4") \
#     .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
#     .config("spark.hadoop.fs.s3a.access.key", "root") \
#     .config("spark.hadoop.fs.s3a.secret.key", "root12345") \
#     .config("spark.hadoop.fs.s3a.path.style.access", "true") \
#     .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:0.13.5") \
#     .config("spark.driver.memory", "6g") \
#     .config("spark.driver.maxResultSize", "3g") \
#     .config("spark.deploy.defaultCores", 2) \
#     .config("spark.dynamicAllocation.enabled", True) \
#     .appName("MySparkApp") \
#     .getOrCreate()

# spark.sparkContext.setLogLevel("DEBUG")

In [2]:
def make_spark_session():
    spark = SparkSession.builder \
        .master("spark://spark-master:7077") \
        .config("spark.executor.instances", 4) \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", "2") \
        .config("spark.driver.cores", "4") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "root") \
        .config("spark.hadoop.fs.s3a.secret.key", "root12345") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:0.13.5") \
        .config("spark.driver.memory", "6g") \
        .config("spark.driver.maxResultSize", "3g") \
        .config("spark.deploy.defaultCores", 2) \
        .config("spark.dynamicAllocation.enabled", True) \
        .appName("MySparkApp") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("DEBUG")

    return spark

In [2]:
%%time
l = [('Alice', 1)]

l = spark.createDataFrame(l, ['name', 'age'])

l.collect()

print(l.is_cached)
l.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
print(l.is_cached)
l.count()
l.unpersist()

False
True
CPU times: user 19.9 ms, sys: 516 µs, total: 20.4 ms
Wall time: 11.6 s


DataFrame[name: string, age: bigint]

In [3]:
# spark.stop()

In [4]:
lines = spark.sparkContext.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.collect()

[('asdasdas', 2), ('asdr23rsaa', 4), ('asdasdaswe', 1)]

In [5]:
# counts = pairs.reduce(lambda a, b: a + b)
# counts

In [6]:
l = spark.sparkContext.parallelize([('a', 1), ('b', 2), ('a', 3), ('c', 3), ('c', 4), ('c', 5)])
print(l.reduceByKey(lambda x, y: x + y).collect())

l = spark.sparkContext.parallelize([1,2,3,4,5])
l.reduce(lambda x, y: x + y)

[('b', 2), ('c', 12), ('a', 4)]


15

In [7]:
('a',1) + ('a', 1)

('a', 1, 'a', 1)

In [8]:
sentences = spark.sparkContext.parallelize(["Hello world", "How are you"])

# Apply flatMap to split each sentence into words
words = sentences.flatMap(lambda sentence: sentence.split())

# Collect the results
result = words.collect()

result

['Hello', 'world', 'How', 'are', 'you']

In [9]:
accum = spark.sparkContext.accumulator(0)

def g(x):
    accum.add(1)
    return x + x

data = spark.sparkContext.parallelize([1,2,3])

data.map(g).collect()

[2, 4, 6]

In [10]:
data.map(g).collect()
accum

Accumulator<id=0, value=6>

### Data skewness

#### Generate big DataFrame

In [2]:
%%time
# Define the size of the DataFrame
num_records = 10**7

# Generate names
names = ['Nicholas', 'Ashley', 'Elizabeth', 'Emily']

# Define the distribution of random integers
# 85% of random integers correspond to the first name
# 15% of random integers are evenly distributed across the other three names
# Generate random integers according to the specified distribution
random_ints = np.random.choice(len(names), size=num_records, p=[0.85, 0.05, 0.05, 0.05])

# Create the DataFrame
df = pd.DataFrame({'name': [names[i] for i in random_ints],
                   'random_int': np.random.randint(1000, 10000, size=num_records)})

spark_df = spark.createDataFrame(df)

spark_df.write.partitionBy('name').mode('overwrite').parquet('s3a://spark/big_table')

CPU times: user 1min 31s, sys: 1.27 s, total: 1min 32s
Wall time: 2min 11s


#### Append data to minio

In [2]:
%%time
num_records = 10**7

names = ['Nicholas', 'Ashley', 'Elizabeth', 'Emily']

# Generate random integers according to the specified distribution
random_ints = np.random.choice(len(names), size=num_records, p=[0.85, 0.05, 0.05, 0.05])

for _ in tqdm(range(5)):
    # Create the DataFrame
    df = pd.DataFrame({'name': [names[i] for i in random_ints],
                       'random_int': np.random.randint(1, 1000000, size=num_records)})
    
    spark.createDataFrame(df) \
        .write \
        .partitionBy('name') \
        .mode('append') \
        .parquet('s3a://spark/big_table')

100%|██████████| 5/5 [10:56<00:00, 131.26s/it]

CPU times: user 7min 23s, sys: 10.1 s, total: 7min 33s
Wall time: 10min 56s





#### Read tables

In [4]:
%%time
big_table = spark.read.parquet('s3a://spark/big_table')

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

little_table.show()

+---------+---------+
|     name|some_flag|
+---------+---------+
| Nicholas|        1|
|   Ashley|        2|
|    Emily|        3|
|Elizabeth|        4|
+---------+---------+

CPU times: user 4.73 ms, sys: 7.95 ms, total: 12.7 ms
Wall time: 23 s


### Join (cache and broadcast hint)

##### Raw join

In [3]:
spark = make_spark_session()

big_table = spark.read.parquet('s3a://spark/big_table')

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

In [4]:
%%time
joined_v0 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v0.explain()

joined_v0.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [name#1], [name#10], Inner
   :- Sort [name#1 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#1, 200), ENSURE_REQUIREMENTS, [plan_id=23]
   :     +- FileScan parquet [random_int#0L,name#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#1)], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- Sort [name#10 ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[name#10], functions=[])
         +- Exchange hashpartitioning(name#10, 200), ENSURE_REQUIREMENTS, [plan_id=19]
            +- HashAggregate(keys=[name#10], functions=[])
               +- FileScan parquet [name#10] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#10)], PushedFilters: [], ReadSchema: struct<>


+----------+--------+--------+

In [5]:
spark.stop()

##### Repartitioned data join

In [6]:
spark = make_spark_session()

big_table = spark.read.parquet('s3a://spark/big_table').orderBy('name').repartition('name')

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

In [7]:
%%time
joined_v1 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [name#38], [name#47], Inner
   :- Sort [name#38 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#38, 200), REPARTITION_BY_COL, [plan_id=196]
   :     +- FileScan parquet [random_int#37L,name#38] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#38)], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- Sort [name#47 ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[name#47], functions=[])
         +- HashAggregate(keys=[name#47], functions=[])
            +- Exchange hashpartitioning(name#47, 200), REPARTITION_BY_COL, [plan_id=198]
               +- FileScan parquet [name#47] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#47)], PushedFilters: [], ReadSchema: struct<>


+----------+------+-----

In [8]:
spark.stop()

##### Repartitioned and sorted data join

In [9]:
spark = make_spark_session()

big_table = spark.read.parquet('s3a://spark/big_table').orderBy('name').repartition('name')

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

In [10]:
%%time
joined_v1 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [name#75], [name#84], Inner
   :- Sort [name#75 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#75, 200), REPARTITION_BY_COL, [plan_id=371]
   :     +- FileScan parquet [random_int#74L,name#75] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#75)], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- Sort [name#84 ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[name#84], functions=[])
         +- HashAggregate(keys=[name#84], functions=[])
            +- Exchange hashpartitioning(name#84, 200), REPARTITION_BY_COL, [plan_id=373]
               +- FileScan parquet [name#84] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [isnotnull(name#84)], PushedFilters: [], ReadSchema: struct<>


+----------+------+-----

In [11]:
spark.stop()

##### Repartitioned and cached data join

In [12]:
spark = make_spark_session()

big_table = spark.read.parquet('s3a://spark/big_table').repartition('name')
big_table.cache()
big_table.count()

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

In [13]:
little_table.cache()
little_table.count()
little_table.is_cached

True

In [14]:
%%time
joined_v1 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [name#112], [name#295], Inner, BuildRight, false
   :- Filter isnotnull(name#112)
   :  +- InMemoryTableScan [random_int#111L, name#112], [isnotnull(name#112)]
   :        +- InMemoryRelation [random_int#111L, name#112], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- Exchange hashpartitioning(name#112, 200), REPARTITION_BY_COL, [plan_id=538]
   :                 +- *(1) ColumnarToRow
   :                    +- FileScan parquet [random_int#111L,name#112] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=639]
      +- Filter isnotnull(name#295)
         +- InMemoryTableScan [name#295, some_flag#184], [isnotnull(name#295)]
               +- InMemoryRela

In [15]:
%%time
big_table.unpersist()
big_table.count()
big_table.is_cached

CPU times: user 2.68 ms, sys: 2.5 ms, total: 5.17 ms
Wall time: 12 s


False

In [16]:
little_table.unpersist()
little_table.count()
little_table.is_cached

False

In [17]:
spark.stop()

##### Sorted, repartitioned, cached data join

In [18]:
%%time
spark = make_spark_session()

big_table = spark.read.parquet('s3a://spark/big_table').orderBy('name').repartition('name')
big_table.cache()
big_table.count()

little_table = big_table \
    .select('name') \
    .distinct() \
    .withColumn('some_flag', 
        F.when(F.col('name') == 'Nicholas', 1) \
         .when(F.col('name') == 'Ashley', 2) \
         .when(F.col('name') == 'Emily', 3) \
         .when(F.col('name') == 'Elizabeth', 4) \
         .otherwise(0)
    )

CPU times: user 34.2 ms, sys: 17 ms, total: 51.2 ms
Wall time: 1min 26s


In [19]:
%%time
joined_v1 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [name#487], [name#563], Inner
   :- Sort [name#487 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(name#487)
   :     +- InMemoryTableScan [random_int#486L, name#487], [isnotnull(name#487)]
   :           +- InMemoryRelation [random_int#486L, name#487], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- Exchange hashpartitioning(name#487, 200), REPARTITION_BY_COL, [plan_id=896]
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet [random_int#486L,name#487] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- Sort [name#563 ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[name#563], functions=[])
         +- HashAggregate(keys=[name#563], functions=[])
            +- Filter isnotnull(name#563)
     

##### Broadcast hint works!

Here two identical operations with and without hint

In [23]:
%%time
joined_v1 = big_table.join(little_table, on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [name#487], [name#1103], Inner
   :- Sort [name#487 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(name#487)
   :     +- InMemoryTableScan [random_int#486L, name#487], [isnotnull(name#487)]
   :           +- InMemoryRelation [random_int#486L, name#487], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- Exchange hashpartitioning(name#487, 200), REPARTITION_BY_COL, [plan_id=896]
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet [random_int#486L,name#487] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- Sort [name#1103 ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[name#1103], functions=[])
         +- HashAggregate(keys=[name#1103], functions=[])
            +- Filter isnotnull(name#1103)


In [25]:
%%time
joined_v1 = big_table.join(little_table.hint("broadcast"), on=[big_table.name == little_table.name], how='inner')

joined_v1.explain()

joined_v1.show(10, False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [name#487], [name#1449], Inner, BuildRight, false
   :- Filter isnotnull(name#487)
   :  +- InMemoryTableScan [random_int#486L, name#487], [isnotnull(name#487)]
   :        +- InMemoryRelation [random_int#486L, name#487], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- Exchange hashpartitioning(name#487, 200), REPARTITION_BY_COL, [plan_id=896]
   :                 +- *(1) ColumnarToRow
   :                    +- FileScan parquet [random_int#486L,name#487] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark/big_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<random_int:bigint>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=1432]
      +- HashAggregate(keys=[name#1449], functions=[])
         +- HashAggregate(keys=[name#1449], functions=[])
            +- Filter isnotnull

### Salt join