In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# *NVIDIA GPU GUIDE*

In [None]:
# Initialize Spark session for the master node
# 1 - Use all available local cores for the master node
# 2 - Set the application name for the master node
# 3 - Configure the driver and executor memory and GPU settings
import subprocess
import json

# Configure Spark with GPU settings for RTX 4090 (24GB) and A5000 (24GB)
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("ERROR 418 - I'M A TEAPOT") \
    .config("spark.driver.memory", "50g") \
    .config("spark.executor.memory", "50g") \
    .config("spark.driver.maxResultSize", "50g") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.rapids.memory.pinnedPool.size", "24G") \
    .config("spark.rapids.sql.concurrentGpuTasks", "2") \
    .config("spark.rapids.memory.gpu.pooling.enabled", "true") \
    .config("spark.rapids.memory.gpu.allocFraction", "0.95") \
    .config("spark.rapids.sql.explain", "ALL") \
    .config("spark.executor.resource.gpu.amount", "2") \
    .config("spark.task.resource.gpu.amount", "0.25") \
    .config("spark.rapids.sql.incompatibleOps.enabled", "true") \
    .config("spark.rapids.memory.host.spillStorageSize", "64G") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.rapids.memory.gpu.maxAllocFraction", "0.95") \
    .config("spark.rapids.sql.batchSizeBytes", "512M") \
    .config("spark.rapids.sql.reader.batchSizeRows", "100000") \
    .config("spark.rapids.sql.variableRowGroupSize.enabled", "true") \
    .getOrCreate()

gpus = tf.config.list_logical_devices('GPU')
if gpus:
  # Replicate your computation on multiple GPUs
  c = []
  for gpu in gpus:
    with tf.device(gpu.name):
      a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
      b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
      c.append(tf.matmul(a, b))

  with tf.device('/CPU:0'):
    matmul_sum = tf.add_n(c)

  print(matmul_sum)

# Set log level to INFO
spark.sparkContext.setLogLevel("INFO")
# Configure logging
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

def custom_logger(level, message):
    color = 'white'
    if level == "INFO":
        color = 'cyan'
    elif level == "SUCCESS":
        color = 'green'
    elif level == "ERROR":
        color = 'red'
    elif level == "ACTION":
        color = 'blue'
    elif level == "PROGRESS" or level == "WARNING":
        color = 'yellow'
    elif level == "FINAL":
        color = 'magenta'
    logger.info(colored(f"SPARK: {level} - {message}", color))

# Set the environment variable for memory allocator
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0, 1"

def initialize_cuda():
    cuda = ctypes.CDLL('libcuda.so')
    cuInit = cuda.cuInit
    cuInit.restype = ctypes.c_int
    cuInit.argtypes = [ctypes.c_uint]
    cuDeviceGetCount = cuda.cuDeviceGetCount
    cuDeviceGetCount.restype = ctypes.c_int
    cuDeviceGetCount.argtypes = [ctypes.POINTER(ctypes.c_int)]
    cuDeviceGet = cuda.cuDeviceGet
    cuDeviceGet.restype = ctypes.c_int
    cuDeviceGet.argtypes = [ctypes.POINTER(ctypes.c_int), ctypes.c_int]

    res = cuInit(0)
    if res != 0:
        raise RuntimeError("Failed to initialize CUDA")
    device_count = ctypes.c_int()
    res = cuDeviceGetCount(ctypes.byref(device_count))
    if res != 0:
        raise RuntimeError("Failed to get device count")
    devices = []
    for i in range(device_count.value):
        device = ctypes.c_int()
        res = cuDeviceGet(ctypes.byref(device), i)
        if res != 0:
            raise RuntimeError(f"Failed to get device {i}")
        devices.append(device.value)
    return devices

def allocate_gpu_memory(size):
    cuda = ctypes.CDLL('libcuda.so')
    cuda_malloc = cuda.cuMemAlloc
    cuda_malloc.restype = ctypes.c_int
    cuda_malloc.argtypes = [ctypes.POINTER(ctypes.c_ulonglong), ctypes.c_ulonglong]
    ptr = ctypes.c_ulonglong()
    res = cuda_malloc(ctypes.byref(ptr), size)
    if res != 0:
        raise RuntimeError("Failed to allocate GPU memory")
    return ptr.value

def free_gpu_memory(ptr):
    cuda = ctypes.CDLL('libcuda.so')
    cuda_free = cuda.cuMemFree
    cuda_free.restype = ctypes.c_int
    cuda_free.argtypes = [ctypes.c_ulonglong]
    res = cuda_free(ptr)
    if res != 0:
        raise RuntimeError("Failed to free GPU memory")

# Check GPU memory usage
def check_gpu_memory():
    os.system('nvidia-smi')

# Initialize CUDA
cuda_devices = initialize_cuda()

# Check available GPU memory
check_gpu_memory()

# Ensure TensorFlow uses the GPU by setting memory growth
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(f"Error setting memory growth: {e}")

# Check and print the number of GPUs available
num_gpus = len(tf.config.list_physical_devices('GPU'))
print(f"Num GPUs Available: {num_gpus}")

# Perform a simple TensorFlow operation to verify GPU usage
with tf.device('/GPU:0'):
    a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=tf.float32)
    b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=tf.float32)
    c = tf.matmul(a, b)
print(f"Result of matrix multiplication on GPU:\n{c.numpy()}")

def get_gpu_memory_usage():
    try:
        result = subprocess.run(['nvidia-smi', '--query-gpu=memory.used', '--format=csv,nounits,noheader'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        memory_used = int(result.stdout.decode('utf-8').strip())
        return memory_used
    except Exception as e:
        print(f"Failed to get GPU memory usage: {e}")
        return None

def freegpu():
    try:
        # Get GPU memory usage before freeing
        memory_before = get_gpu_memory_usage()

        result = subprocess.run(['sudo', 'fuser', '-v', '/dev/nvidia*'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        for line in result.stdout.decode('utf-8').split('\n'):
            if '/dev/nvidia' in line:
                pid = int(line.split()[-1])
                os.kill(pid, 9)

        # Get GPU memory usage after freeing
        memory_after = get_gpu_memory_usage()

        if memory_before is not None and memory_after is not None:
            memory_freed = memory_before - memory_after
            print(f"GPU memory has been freed. {memory_freed} MB of GPU memory was released.")
        else:
            print("GPU memory has been freed.")
    except Exception as e:
        print(f"Failed to free GPU memory: {e}")

# Set the environment variable for XLA flags
os.environ['XLA_FLAGS'] = '--xla_gpu_strict_conv_algorithm_picker=false'

# ======================================================================================================
# ==============================[SETUP MIRROREDSTRATEGY WITHOUT NCCL]===================================
# ======================================================================================================

# ------------------------------------------------------------------------------------------------------
# Define the strategy with HierarchicalCopyAllReduce to avoid NCCL
# ------------------------------------------------------------------------------------------------------
strategy = tf.distribute.MirroredStrategy(
    devices=["/gpu:0", "/gpu:1"],
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce()
)

print(f"Number of devices under strategy: {strategy.num_replicas_in_sync}")