In [0]:
# Lista el contenido del directorio raíz en DBFS (cluster distribuido, donde persisten los datos)
root_files = dbutils.fs.ls('/')
for file in root_files:
    print(file.path)

dbfs:/FileStore/
dbfs:/databricks-datasets/
dbfs:/databricks-results/
dbfs:/tmp/


In [0]:
# Lista el contenido del directorio /FileStore
filestore_files = dbutils.fs.ls("/FileStore/shared_uploads/cristian.neumann@usach.cl")
for file in filestore_files:
    print(file.path)

dbfs:/FileStore/shared_uploads/cristian.neumann@usach.cl/kaggle.json


In [0]:
# Leer el contenido de un archivo
dbutils.fs.head("dbfs:/FileStore/shared_uploads/cristian.neumann@usach.cl/kaggle.json")

Out[11]: '{"username":"cristianneumann","key":"51956eb6263a97a83b2c65c47447a264"}'

In [0]:
# Instalar librerias de Kaggle
%pip install kaggle

Python interpreter will be restarted.
Collecting kaggle
  Downloading kaggle-1.6.14.tar.gz (82 kB)
Collecting certifi>=2023.7.22
  Downloading certifi-2024.7.4-py3-none-any.whl (162 kB)
Collecting tqdm
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting python-slugify
  Downloading python_slugify-8.0.4-py2.py3-none-any.whl (10 kB)
Collecting text-unidecode>=1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
Building wheels for collected packages: kaggle
  Building wheel for kaggle (setup.py): started
  Building wheel for kaggle (setup.py): finished with status 'done'
  Created wheel for kaggle: filename=kaggle-1.6.14-py3-none-any.whl size=105136 sha256=4320da42e0bd471242ed7599a48ae1cd36ebf76d5d9a9faef60d2776c9b10464
  Stored in directory: /root/.cache/pip/wheels/23/f1/ec/3f9079ada5cc1218f9accf05ccca9f0a57533191e49d76e3f6
Successfully built kaggle
Installing collected packages: text-unidecode, certifi, tqdm, python-slugify, kaggle
  Attempting uninstall: certifi

In [0]:
# Confirgurar credenciales de Kaggle
import os
import json

# Cargar el archivo kaggle.json que has subido a DBFS
dbutils.fs.cp("dbfs:/FileStore/shared_uploads/cristian.neumann@usach.cl/kaggle.json", "file:/root/.kaggle/kaggle.json")

# Establecer los permisos adecuados
os.chmod("/root/.kaggle/kaggle.json", 0o600)

# Verificar que las credenciales estén configuradas correctamente
with open("/root/.kaggle/kaggle.json", "r") as f:
    kaggle_credentials = json.load(f)
    print(kaggle_credentials)

{'username': 'cristianneumann', 'key': '51956eb6263a97a83b2c65c47447a264'}


In [0]:
# Listar los datasets de kaggle disponibles en su API
import kaggle

# Funcion que transforma el contenido de dataset.size (string) en número
def size_to_bytes(size_str):
    size_str = size_str.strip().upper()
    size_units = {"KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
    if size_str[-2:] in size_units:
        return float(size_str[:-2]) * size_units[size_str[-2:]]
    elif size_str[-1] == "B":
        return float(size_str[:-1])
    else:
        return float(size_str)

# Buscar datasets grandes. Probamos con la etiqueta "parquet", pudiendo existir muchas otras.
datasets = kaggle.api.dataset_list(search="parquet")

# Filtrar datasets por tamaño (mostrando aquellos mayores a 0,5 GB)
large_datasets = [dataset for dataset in datasets if size_to_bytes(dataset.size) > (1024**3)/2]

# Mostrar los datasets grandes
for dataset in large_datasets:
    print(dataset, dataset.size)

ruchi798/parquet-files-amexdefault-prediction 22GB
odins0n/amex-parquet 9GB
robikscube/ubiquant-parquet 13GB
raddar/amex-data-integer-dtypes-parquet-format 4GB
rohanrao/riiid-train-data-multiple-formats 4GB
robikscube/ai4code-parquet-tabular 785MB
ravi20076/optiver-memoryreduceddatasets 2GB
jjinho/wikipedia-20230701 12GB
alvinleenh/tps-rocket-league-data-float16-parquet-format 2GB
columbia2131/otto-chunk-data-inparquet-format 2GB
ryati131457/riiid-parquet-files 564MB
samuelcortinhas/tps-nov-22-dataset-in-parquet-format 1GB
mustafakeser4/tpsoct22-parquet 3GB
cdeotte/otto-validation 1GB
titericz/leap-dataset-giba 16GB
pedrocouto39/jane-street-market-train-data-best-formats 10GB


In [0]:
# Buscar el dataset "amex-data-integer-dtypes-parquet-format" del usuario "raddar"
dataset = kaggle.api.dataset_list(search="raddar/amex-data-integer-dtypes-parquet-format")
print(dataset, dataset[0].size)

[raddar/amex-data-integer-dtypes-parquet-format] 4GB


In [0]:
# Importar el dataset "amex-data-integer-dtypes-parquet-format" en ruta del nodo local /tmp/wiki_data/. Esto, pues la API de Kaggle descargará el archivo al nodo local siempre.
import kaggle
kaggle.api.dataset_download_files('raddar/amex-data-integer-dtypes-parquet-format', path='/tmp/wiki_data/', unzip=True)

Dataset URL: https://www.kaggle.com/datasets/raddar/amex-data-integer-dtypes-parquet-format


In [0]:
# Lista el contenido del directorio Local: file:/tmp/wiki_data/
root_local_files = dbutils.fs.ls('file:/tmp/wiki_data/')
for file in root_local_files:
    print(file.path)

In [0]:
# Mover el dataset descargado desde el sistema de archivos local al distribuido (DBFS), para que persista
dbutils.fs.cp("file:/tmp/wiki_data/", "dbfs:/tmp/wiki_data/", recurse=True)

Out[13]: True

In [0]:
# Volvemos a listar el contenido del directorio raíz en DBFS para verificar que aparezca el path del dataset copiado
root_files = dbutils.fs.ls('/')
for file in root_files:
    print(file.path)

dbfs:/FileStore/
dbfs:/databricks-datasets/
dbfs:/databricks-results/
dbfs:/tmp/


# ACLARACIÓN 
- Local Filesystem: Cuando usas "file:", te estás refiriendo al sistema de archivos local del nodo en el que se está ejecutando tu código. Esto es similar a cómo manejarías archivos en un entorno de escritorio o servidor normal. Los archivos almacenados en el sistema de archivos local son temporales y pueden no estar disponibles después de que finaliza la sesión o el clúster se reinicia.

- Databricks File System (dbfs:): DBFS es un sistema de archivos distribuido que está disponible en todos los nodos de tu clúster de Databricks. Esto significa que los archivos en DBFS pueden ser accedidos desde cualquier nodo, lo cual es crucial para la ejecución de trabajos distribuidos. Los archivos almacenados en DBFS son persistentes y están disponibles incluso después de que finaliza la sesión o se reinicia el clúster. Esto hace que DBFS sea ideal para almacenar datos que necesitas reutilizar o compartir entre sesiones. DBFS está optimizado para trabajar con Apache Spark, lo que facilita la lectura y escritura de grandes conjuntos de datos distribuidos.

In [0]:
# Lista el contenido del directorio dbfs:/tmp/wiki_data/
files = dbutils.fs.ls("dbfs:/tmp/wiki_data/")
for file in files:
    print(file.path)

dbfs:/tmp/wiki_data/test.parquet
dbfs:/tmp/wiki_data/train.parquet


In [0]:
# Cargar los datos de entrenamiento en un DataFrame de Spark
path = "dbfs:/tmp/wiki_data/train.parquet"
df_train = spark.read.format('parquet').options(header=True,inferSchema=True).load(path)

In [0]:
# Obtenemos la cantidad de filas y columnas del DF de entrenamiento
print((df_train.count(), len(df_train.columns)))

(5531451, 190)


In [0]:
# Conociendo el nombre de las variables categóricas, las separo de las variables numéricas.
categoricas = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
numericas = [col for col in df_train.columns if col not in categoricas]
print(len(categoricas), len(numericas))

11 179


In [0]:
# Obtengo las etiquetas desde AWS S3.
ACCESS_KEY = "AKIA6NTIDLSP7UKVPAJJ"
SECRET_KEY = "ExBmrC7VGh6MoBCIKkGdUQ5uvOev2uQbQmVPF7ZL"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "streaming-bucket-1"
S3_FILE_PATH = f"s3a://{ACCESS_KEY}:{ENCODED_SECRET_KEY}@{AWS_BUCKET_NAME}/train_labels.csv"
DBFS_FILE_PATH = "dbfs:/tmp/wiki_data/train_labels.csv"

# Copiar el archivo desde S3 a DBFS
dbutils.fs.cp(S3_FILE_PATH, DBFS_FILE_PATH)

Out[30]: True

In [0]:
# Lista el contenido del directorio dbfs:/tmp/wiki_data/
files = dbutils.fs.ls("dbfs:/tmp/wiki_data/")
for file in files:
    print(file.path)

dbfs:/tmp/wiki_data/test.parquet
dbfs:/tmp/wiki_data/train.parquet
dbfs:/tmp/wiki_data/train_labels.csv


In [0]:
# Leer el archivo CSV con las etiquetas desde DBFS
csv_file_path = "dbfs:/tmp/wiki_data/train_labels.csv"
df_labels = spark.read.csv(csv_file_path, header=True, inferSchema=True, sep=";")

# Mostrar los datos del DataFrame con las etiquetas
df_labels.show(3)

+--------------------+------+
|         customer_ID|target|
+--------------------+------+
|0000099d6bd597052...|     0|
|00000fd6641609c6e...|     0|
|00001b22f846c82c5...|     0|
+--------------------+------+
only showing top 3 rows



In [0]:
# Count of target variable
fig, ax = plt.subplots(figsize=(10,5))
sns.countplot(x=train_y.target)
plt.show()