Setup and Configuration

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from google.cloud import bigquery

# Initialize the BigQuery client
client = bigquery.Client()

# Set plot style for better aesthetics
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

# Define the table ID for easy reference
TABLE_ID = "du-hast-mich.customer_yeahmobi.zmaticoo"

High-Level Job Overview

In [None]:
sql_job_status = f"""
SELECT
    state,
    COUNT(*) AS job_count
FROM
    `{TABLE_ID}`
GROUP BY
    state
ORDER BY
    job_count DESC;
"""

df_job_status = client.query(sql_job_status).to_dataframe()

# Plotting
ax = sns.barplot(x='state', y='job_count', data=df_job_status)
ax.set_title('Distribution of Dataproc Job Statuses', fontsize=16)
ax.set_xlabel('Job Status')
ax.set_ylabel('Number of Jobs')
for index, row in df_job_status.iterrows():
    ax.text(index, row.job_count, row.job_count, color='black', ha="center")
plt.show()

print("Insight: A high number of 'ERROR' states could indicate underlying problems with job configurations, permissions, or cluster stability.")

⏱️ Runtime vs. Elapsed Time

In [None]:
sql_runtime = f"""
SELECT
    run_time,
    elapsed_time,
    (elapsed_time - run_time) AS wait_time
FROM
    `{TABLE_ID}`
WHERE
    state = 'SUCCEEDED'
    AND run_time IS NOT NULL
    AND elapsed_time IS NOT NULL;
"""

df_runtime = client.query(sql_runtime).to_dataframe()

# Plotting the distributions
sns.histplot(df_runtime['run_time'], color='skyblue', kde=True, label='Run Time', stat="density")
sns.histplot(df_runtime['elapsed_time'], color='red', kde=True, label='Elapsed Time', stat="density")
plt.title('Distribution of Job Run Time vs. Elapsed Time (in seconds)')
plt.xlabel('Time (seconds)')
plt.legend()
plt.show()

print("Insight: The distributions show a significant portion of time is spent waiting. This suggests potential bottlenecks in cluster scheduling or resource availability.")

💰 DCU and Shuffle Storage Consumption

In [None]:
sql_cost_metrics = f"""
SELECT
    milliDcuSeconds / 3600000.0 AS dcu_hours,
    shuffleStorageGbSeconds,
    run_time
FROM
    `{TABLE_ID}`
WHERE
    state = 'SUCCEEDED'
    AND milliDcuSeconds > 0;
"""

df_cost = client.query(sql_cost_metrics).to_dataframe()

# Scatter plot of DCU Hours vs. Runtime
sns.scatterplot(data=df_cost, x='run_time', y='dcu_hours')
plt.title('DCU Hours vs. Job Runtime')
plt.xlabel('Run Time (seconds)')
plt.ylabel('DCU Hours')
plt.xscale('log')
plt.yscale('log')
plt.show()

print("Insight: The strong positive correlation is expected. However, jobs that lie far above the main trend line are inefficient, consuming high DCUs for their runtime. These are prime candidates for optimization.")

⚙️ Executor and Driver Configuration

In [None]:
sql_tiers = f"""
SELECT
    properties.spark_spark_dataproc_executor_compute_tier AS executor_compute_tier,
    properties.spark_spark_dataproc_executor_disk_tier AS executor_disk_tier,
    AVG(run_time) as avg_runtime_seconds,
    AVG(milliDcuSeconds / 3600000.0) as avg_dcu_hours,
    COUNT(*) as job_count
FROM
    `{TABLE_ID}`
WHERE
    state = 'SUCCEEDED'
GROUP BY
    1, 2
ORDER BY
    job_count DESC
LIMIT 10;
"""

df_tiers = client.query(sql_tiers).to_dataframe()

print("Top 10 Most Common Executor Tier Configurations:")
display(df_tiers)

print("\nInsight: Analyze if jobs using 'premium' tiers justify the cost with significantly lower runtimes. If 'standard' tier jobs have acceptable performance, they may offer better cost-efficiency.")

🔀 Impact of Shuffle Partitions on Performance

In [None]:
sql_shuffle = f"""
SELECT
    properties.spark_spark_sql_shuffle_partitions AS shuffle_partitions,
    AVG(shuffleStorageGbSeconds) as avg_shuffle_gb_seconds,
    AVG(run_time) as avg_runtime_seconds
FROM
    `{TABLE_ID}`
WHERE
    state = 'SUCCEEDED'
    AND properties.spark_spark_sql_shuffle_partitions IS NOT NULL
    AND shuffleStorageGbSeconds > 0
GROUP BY
    1
HAVING
    COUNT(*) > 10 -- Only consider partition counts used in more than 10 jobs
ORDER BY
    shuffle_partitions;
"""

df_shuffle = client.query(sql_shuffle).to_dataframe()

# Plotting the results
fig, ax1 = plt.subplots()

# Bar plot for average shuffle
sns.barplot(data=df_shuffle, x='shuffle_partitions', y='avg_shuffle_gb_seconds', color='g', ax=ax1, alpha=0.6)
ax1.set_ylabel('Avg Shuffle GB-Seconds', color='g')
ax1.tick_params(axis='y', labelcolor='g')
ax1.set_xlabel('Spark Shuffle Partitions')

# Line plot for average runtime on a second y-axis
ax2 = ax1.twinx()
sns.lineplot(data=df_shuffle, x='shuffle_partitions', y='avg_runtime_seconds', color='b', marker='o', ax=ax2)
ax2.set_ylabel('Avg Runtime (seconds)', color='b')
ax2.tick_params(axis='y', labelcolor='b')

plt.title('Impact of Shuffle Partitions on Runtime and Shuffle Storage')
plt.xticks(rotation=45)
plt.show()

print("Insight: Look for the 'sweet spot'. The optimal number of partitions should minimize both runtime and shuffle usage. Very low or very high values often lead to poor performance. The graph can help identify a better default value for your jobs.")

按照运行时间平均分配到5个桶

In [None]:
sql_bucket_summary = f"""
WITH JobBuckets AS (
  SELECT
    run_time,
    -- Assign jobs to one of 5 buckets based on their run time
    NTILE(5) OVER (ORDER BY run_time) AS runtime_bucket
  FROM
    `{TABLE_ID}`
  WHERE
    state = 'SUCCEEDED' AND run_time IS NOT NULL
)
-- Summarize the characteristics of each bucket
SELECT
  runtime_bucket,
  COUNT(*) AS number_of_jobs,
  MIN(run_time) AS min_runtime,
  MAX(run_time) AS max_runtime
FROM
  JobBuckets
GROUP BY
  runtime_bucket
ORDER BY
  runtime_bucket;
"""

df_summary = client.query(sql_bucket_summary).to_dataframe()

# Create a new column for the x-axis labels (runtime ranges)
df_summary['runtime_range'] = df_summary.apply(
    lambda row: f"{row['min_runtime']} - {row['max_runtime']}s", axis=1
)

# Plotting the data
plt.figure(figsize=(14, 7))
ax = sns.barplot(x='runtime_range', y='number_of_jobs', data=df_summary, palette='viridis')

ax.set_title('Number of Jobs by Runtime Range', fontsize=16)
ax.set_xlabel('Job Runtime Range (seconds)', fontsize=12)
ax.set_ylabel('Number of Jobs', fontsize=12)
plt.xticks(rotation=45, ha='right')

# Add the count labels on top of each bar
for index, row in df_summary.iterrows():
    ax.text(index, row.number_of_jobs, row.number_of_jobs, color='black', ha="center", va='bottom')

plt.tight_layout()
plt.show()

print("\nInsight: Since we used NTILE(5), the bar chart confirms that each defined runtime range contains an equal number of jobs. This method is excellent for segmenting your workload into distinct performance tiers (e.g., 'very fast', 'fast', 'medium', 'slow', 'very slow') for targeted analysis.")

按照日期看每个桶每天任务数

In [None]:
sql_daily_stacked = f"""
WITH JobBuckets AS (
  SELECT
    createTime,
    -- Use NTILE(5) to create 5 buckets based on run_time ordering
    NTILE(5) OVER (ORDER BY run_time) AS runtime_bucket
  FROM
    `{TABLE_ID}`
  WHERE
    state = 'SUCCEEDED' AND run_time IS NOT NULL
)
-- Summarize the daily counts for each bucket
SELECT
  EXTRACT(DATE FROM createTime) AS job_date,
  runtime_bucket,
  COUNT(*) AS number_of_jobs
FROM
  JobBuckets
GROUP BY
  job_date,
  runtime_bucket
ORDER BY
  job_date,
  runtime_bucket;
"""

df_daily = client.query(sql_daily_stacked).to_dataframe()

# Pivot the data to get dates as rows and buckets as columns
df_pivot = df_daily.pivot(index='job_date', columns='runtime_bucket', values='number_of_jobs').fillna(0)

# Plotting the stacked bar chart
ax = df_pivot.plot(kind='bar', stacked=True, figsize=(15, 8), colormap='viridis_r')

plt.title('Daily Job Counts Stacked by Runtime Bucket', fontsize=16)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Number of Jobs', fontsize=12)
plt.legend(title='Runtime Bucket (1=Fastest, 5=Slowest)')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

print("\nInsight: This view helps you spot trends in your daily workload. For instance, you can see if days with a high total number of jobs are composed of many quick jobs (dominated by buckets 1 and 2) or a few long-running jobs (dominated by bucket 5). A sudden increase in 'slow' jobs on a particular day might signal an issue or a change in the type of analysis being performed.")

每天每个桶里的任务中premium vs. non-premium

In [None]:
sql_tier_breakdown = f"""
WITH JobBuckets AS (
  SELECT
    createTime,
    properties.spark_spark_dataproc_executor_compute_tier AS executor_compute_tier,
    -- Use NTILE(5) to create 5 buckets based on run_time ordering
    NTILE(5) OVER (ORDER BY run_time) AS runtime_bucket
  FROM
    `{TABLE_ID}`
  WHERE
    state = 'SUCCEEDED' AND run_time IS NOT NULL
)
-- Summarize the daily counts for each bucket and tier
SELECT
  EXTRACT(DATE FROM createTime) AS job_date,
  runtime_bucket,
  CASE
    WHEN executor_compute_tier = 'premium' THEN 'Premium'
    ELSE 'Non-Premium'
  END AS compute_tier_type,
  COUNT(*) AS number_of_jobs
FROM
  JobBuckets
GROUP BY
  job_date,
  runtime_bucket,
  compute_tier_type
ORDER BY
  job_date,
  runtime_bucket,
  compute_tier_type;
"""

df_tier_breakdown = client.query(sql_tier_breakdown).to_dataframe()

print("Daily Job Counts Broken Down by Runtime Bucket and Compute Tier:")
display(df_tier_breakdown.head(10))

In [None]:
# Separate the data into two DataFrames
df_premium = df_tier_breakdown[df_tier_breakdown['compute_tier_type'] == 'Premium']
df_non_premium = df_tier_breakdown[df_tier_breakdown['compute_tier_type'] == 'Non-Premium']

# Pivot both DataFrames
pivot_premium = df_premium.pivot(index='job_date', columns='runtime_bucket', values='number_of_jobs').fillna(0)
pivot_non_premium = df_non_premium.pivot(index='job_date', columns='runtime_bucket', values='number_of_jobs').fillna(0)

# Create a figure with two subplots, one for each tier
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12), sharex=True)

# Plot for Premium Tier
pivot_premium.plot(kind='bar', stacked=True, colormap='Oranges', ax=ax1)
ax1.set_title('Daily Premium Tier Jobs by Runtime Bucket', fontsize=16)
ax1.set_ylabel('Number of Jobs')
ax1.legend(title='Runtime Bucket (1=Fastest, 5=Slowest)')

# Plot for Non-Premium Tier
pivot_non_premium.plot(kind='bar', stacked=True, colormap='Blues', ax=ax2)
ax2.set_title('Daily Non-Premium Tier Jobs by Runtime Bucket', fontsize=16)
ax2.set_xlabel('Date', fontsize=12)
ax2.set_ylabel('Number of Jobs')
ax2.legend(title='Runtime Bucket (1=Fastest, 5=Slowest)')

plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

print("\nInsight: By separating the charts, you can easily compare workload patterns. For example, you might discover that your longest-running jobs (bucket 5) are predominantly run on non-premium hardware. This could be an opportunity for optimization: testing if running these slow jobs on the premium tier would be more cost-effective by significantly reducing their runtime.")