In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .master("spark://boss:7077")
         .config("spark.executor.memory", "512M")
         .config("spark.sql.warehouse.dir", "hdfs://nn:9000/user/hive/warehouse")
         .enableHiveSupport()
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/04 03:21:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import requests

# APIs to tell use what's going on in the Spark cluster (4040 is the port that the cluster view is on):

# https://spark.apache.org/docs/latest/monitoring.html#rest-api
# http://localhost:4040/api/v1/applications
# http://localhost:4040/api/v1/applications/{app_id}/executors
# look for "totalTasks"

#r = requests.get(????)
r = requests.get("http://localhost:4040/api/v1/applications")
r.raise_for_status()
r.json()

[{'id': 'app-20231104032144-0007',
  'name': 'cs544',
  'attempts': [{'startTime': '2023-11-04T03:21:41.847GMT',
    'endTime': '1969-12-31T23:59:59.999GMT',
    'lastUpdated': '2023-11-04T03:21:41.847GMT',
    'duration': 150653,
    'sparkUser': 'root',
    'completed': False,
    'appSparkVersion': '3.5.0',
    'startTimeEpoch': 1699068101847,
    'endTimeEpoch': -1,
    'lastUpdatedEpoch': 1699068101847}]}]

In [3]:
app_id = r.json()[0]["id"]
app_id

'app-20231104032144-0007'

In [5]:
executors = requests.get(f"http://localhost:4040/api/v1/applications/{app_id}/executors")
executors.raise_for_status()

# how many tasks has each Spark worker done?
[exec["totalTasks"] for exec in executors.json()]

[0, 0, 0]

In [7]:
# recreate the calls view
(spark.read
 .format("parquet")
 .load("hdfs://nn:9000/sf.parquet")
 .createOrReplaceTempView("calls")
)

23/11/04 03:31:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [8]:
# We can make the tasks imbalanced between the workers by repeatedly using cached data on one worker
df = spark.table("calls").sample(withReplacement=True, fraction=0.01).repartition(1).cache()

In [9]:
df.count()

                                                                                

59912

In [10]:
executors = requests.get(f"http://localhost:4040/api/v1/applications/{app_id}/executors")
executors.raise_for_status()

# how many tasks has each Spark worker done?
[exec["totalTasks"] for exec in executors.json()]

[0, 5, 4]

In [11]:
# the sample is cached on only one worker (we didn't use 2x replication)
# so this will force 30 additional tasks onto 1 worker (sample only takes up 1 partition; 1 task)
# to avoid this, use .cache(MEMORY_2) so multiple workers get assigned some work
for i in range(30):
    df.count()

In [12]:
executors = requests.get(f"http://localhost:4040/api/v1/applications/{app_id}/executors")
executors.raise_for_status()

# how many tasks has each Spark worker done?
[exec["totalTasks"] for exec in executors.json()]

[0, 5, 34]

In [15]:
# We can see the physical plan for Spark queries using .explain()
# Let's use this to see how Spark will handle the GROUP BY in this query
spark.sql("""
SELECT
    Call_Type
    , COUNT(*) as count
FROM
    calls
GROUP BY
    Call_Type /* Force the shuffling for sake of demo */
""").explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan parquet  (1)


(1) Scan parquet 
Output [1]: [Call_Type#3]
Batched: true
Location: InMemoryFileIndex [hdfs://nn:9000/sf.parquet]
ReadSchema: struct<Call_Type:string>

(2) HashAggregate
Input [1]: [Call_Type#3]
Keys [1]: [Call_Type#3]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#23405L]
Results [2]: [Call_Type#3, count#23406L]

(3) Exchange
Input [2]: [Call_Type#3, count#23406L]
Arguments: hashpartitioning(Call_Type#3, 200), ENSURE_REQUIREMENTS, [plan_id=839]

(4) HashAggregate
Input [2]: [Call_Type#3, count#23406L]
Keys [1]: [Call_Type#3]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23402L]
Results [2]: [Call_Type#3, count(1)#23402L AS count#23401L]

(5) AdaptiveSparkPlan
Output [2]: [Call_Type#3, count#23401L]
Arguments: isFinalPlan=false


