# Carga de datos utilizando PySpark

In [6]:
import os
import shutil

## dbutils
biblioteca de utilidades proporcionada por Databricks que facilita la interacción y el acceso a diversos servicios y recursos dentro del entorno de Databricks.

### dbutils.fs
Proporciona funciones para interactuar con el sistema de archivos de Databricks.

In [None]:
path = '/FileStore/tables'

In [4]:

path = 'data/'

# Create the directory if it doesn't exist
if not os.path.exists(path):
    os.makedirs(path)

# List contents of the directory
files = os.listdir(path)

In [None]:
dbutils.fs.ls(path)

Out[4]: [FileInfo(path='dbfs:/FileStore/tables/Muestra___Supertienda.csv', name='Muestra___Supertienda.csv', size=2196839, modificationTime=1672253282000),
 FileInfo(path='dbfs:/FileStore/tables/Sample___Superstore.xlsx', name='Sample___Superstore.xlsx', size=1215519, modificationTime=1680137551000),
 FileInfo(path='dbfs:/FileStore/tables/Sample___Superstore___Orders.csv', name='Sample___Superstore___Orders.csv', size=2348309, modificationTime=1680137828000),
 FileInfo(path='dbfs:/FileStore/tables/bronce/', name='bronce/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/data-1.csv', name='data-1.csv', size=2288519, modificationTime=1687378596000),
 FileInfo(path='dbfs:/FileStore/tables/data-2.csv', name='data-2.csv', size=2288519, modificationTime=1687378611000),
 FileInfo(path='dbfs:/FileStore/tables/data-3.csv', name='data-3.csv', size=2288519, modificationTime=1687395385000),
 FileInfo(path='dbfs:/FileStore/tables/data.csv', name='data.csv', size=2288519, modific

In [None]:
dbutils.fs.mkdirs(path + "/bronce")
dbutils.fs.mkdirs(path + "/silver")
dbutils.fs.mkdirs(path + "/gold")

Out[3]: True

In [5]:
# Create directories if they don't exist
os.makedirs(os.path.join(path, 'bronce'), exist_ok=True)
os.makedirs(os.path.join(path, 'silver'), exist_ok=True)
os.makedirs(os.path.join(path, 'gold'), exist_ok=True)

In [None]:
src = path + '/data.csv'
dest = path + '/bronce/data.csv'
dbutils.fs.cp(src, dest)

Out[5]: True

In [7]:
src = os.path.join(path, 'data.csv')
dest = os.path.join(path, 'bronce', 'data.csv')

# Ensure destination directory exists
os.makedirs(os.path.dirname(dest), exist_ok=True)

# Copy the file
try:
    shutil.copy(src, dest)
    print(f"File copied from {src} to {dest}")
except FileNotFoundError as e:
    print(f"Error: {e}")

File copied from data/data.csv to data/bronce\data.csv


In [None]:
archivo = path + '/bronce/data.csv'
dbutils.fs.head(archivo)

[Truncated to first 65536 bytes]
Out[7]: '\ufeffRow ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,State,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit\r\n1,CA-2016-152156,11/8/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0,41.9136\r\n2,CA-2016-152156,11/8/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",731.94,3,0,219.582\r\n3,CA-2016-138688,6/12/2016,6/16/2016,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal,14.62,2,0,6.8714\r\n4,US-2015-108966,10/11/2015,10/18/2015,Sta

In [None]:
dbutils.fs.rm(archivo, recursive=True)

### dbutils.widgets
Permite crear y trabajar con widgets interactivos en los notebooks.

In [None]:
# Definición de los widgets
dbutils.widgets.text('superficie_input', '100', 'Superficie de la vivienda')

# Lectura del valor del widget
superficie_input = float(dbutils.widgets.get('superficie_input'))

superficie_input

Out[11]: 20000.0

In [None]:
# Definición de los widgets
algoritmos = ['Logistic Regression', 'Naive Bayes']
dbutils.widgets.dropdown('algoritmo_input', 'Logistic Regression', algoritmos, 'Algoritmo de PLN')

In [None]:
# Lectura del valor del widget
algoritmo_input = dbutils.widgets.get('algoritmo_input')
algoritmo_input

Out[14]: 'Naive Bayes'

### dbutils.notebook
Proporciona funciones para interactuar con otros notebooks en Databricks.

In [None]:
path_notebook = ''
timeoutSeconds = 600
dbutils.notebook.run(path_notebook, timeoutSeconds)

In [None]:
dbutils.notebook.exit(result)

### dbutils.help
Puedes explorar más funciones y obtener información detallada en la documentación oficial de Databricks o utilizando el comando

In [None]:
dbutils.help()

## spark.read 
Función de entrada para carga de datos

### Lectura de archivos CSV

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Lectura de un archivo CSV local
df_csv = spark.read.csv(path + '/data.csv', header=True, inferSchema=True)

# Lectura de un archivo CSV desde una URL
#df_csv_url = spark.read.csv('https://example.com/archivo.csv', header=True, inferSchema=True)


#### heder & inferSchema

- header=True: Esta opción se utiliza cuando el archivo de datos contiene una línea de encabezado en la que se encuentran los nombres de las columnas. Al establecer header=True, Spark interpretará la primera línea del archivo como el encabezado y asignará esos nombres a las columnas correspondientes en el DataFrame resultante. Esto facilita el acceso a los datos utilizando nombres de columna descriptivos en lugar de utilizar índices numéricos.

- inferSchema=True: Esta opción se utiliza para inferir automáticamente los tipos de datos de cada columna en el DataFrame. Al establecer inferSchema=True, Spark leerá una muestra de los datos y realizará una estimación de los tipos de datos adecuados para cada columna. Esto puede ser útil cuando no se especifica explícitamente el esquema del archivo de datos y se desea que Spark determine automáticamente los tipos de datos más apropiados.

Ambas opciones (header e inferSchema) son útiles para simplificar el proceso de lectura de datos y asegurarse de que el DataFrame resultante tenga la estructura y los tipos de datos adecuados para su posterior procesamiento y análisis.

In [None]:
# Lectura de un archivo JSON local
df_json = spark.read.json('/ruta/al/archivo.json')

# Lectura de un archivo Parquet local
df_parquet = spark.read.parquet('/ruta/al/archivo.parquet')

# Lectura de un archivo AVRO local
df_avro = spark.read.format('avro').load('/ruta/al/archivo.avro')

# Lectura de datos desde un sistema de almacenamiento en la nube (S3)
df_s3 = spark.read.csv('s3://bucket/archivo.csv', header=True, inferSchema=True)


### Lectura de bases de datos JDBC

In [None]:
# Definir las opciones de conexión a la base de datos MySQL
options = {
    "url": "jdbc:mysql://localhost:3306/pysparkdb",
    "driver": "org.mariadb.jdbc.Driver",
    "dbtable": "authors",
    "user": "#####",
    "password": "@@@@@"
}

# Leer los datos de la tabla "authors" desde la base de datos MySQL
dataframe_mysql = spark.read.format("jdbc").options(**options).load()