In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, regexp_replace, split, lower

try:
    spark = SparkSession.builder \
        .appName("Proyecto SparkSQL") \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4") \
        .config("spark.driver.extraClassPath", "/path/to/postgresql-42.7.4.jar") \
        .config("spark.executor.extraClassPath", "/path/to/postgresql-42.7.4.jar") \
        .getOrCreate()
    print("SparkSession creada:", spark.version)
except Exception as e:
    print("Error al crear SparkSession:", str(e))


# Load CSV file OIJ.CSV
df_oij = spark.read.csv("C:\\CODE\\SPARK\\spark\\data\\OIJ.csv", header=True, inferSchema=True)

# Load CSV file INEC.CSV
df_inec = spark.read.csv("C:\\CODE\\SPARK\\spark\\data\\INEC.csv", header=True, inferSchema=True)

def district_lowercase(df, column_name):
    """
    Pasa los datos de una columna a minúsculas 
    
    Args:
        df: DataFrame de Spark
        column_name: Nombre de la columna a limpiar
        
    Returns:
        DataFrame con la columna con sus datos en minúscula
    """
    # usando la funcion lower, modifica los datos de la columna con la información en minúscula
    df_lower = df.withColumn(
        column_name,
        lower(column_name))
    
    
    return df_lower



# Uso:
# Para OIJ.csv:
#df_oij_clean = clean_district_column(df_oij, 'Distrito')

#lower Case

df_oij_low = district_lowercase(df_oij, 'Distrito')




# Show the first few rows of the DataFrame
df_oij_low.show()

# Print the DataFrame schema
#df_oij_low.printSchema()

SparkSession creada: 3.5.3
+------+-----------+----------+-------------------+----------+--------------------+-------------+------------+-----------+----------+-------------+----+
|Delito|  SubDelito|     Fecha|            Victima|SubVictima|                Edad|         Sexo|Nacionalidad|  Provincia|    Canton|     Distrito|_c11|
+------+-----------+----------+-------------------+----------+--------------------+-------------+------------+-----------+----------+-------------+----+
|ASALTO|ARMA BLANCA|2011-01-10|18:00:00 - 20:59:59|   PERSONA|    PEATON [PERSONA]|Mayor de edad|      HOMBRE|  NICARAGUA|  SAN JOSE|   alajuelita|NULL|
|ASALTO|ARMA BLANCA|2011-02-02|03:00:00 - 05:59:59|   PERSONA|    PEATON [PERSONA]|Mayor de edad|      HOMBRE| COSTA RICA|   CARTAGO|     oreamuno|NULL|
|ASALTO|ARMA BLANCA|2011-10-23|21:00:00 - 23:59:59|   PERSONA|    PEATON [PERSONA]|Mayor de edad|      HOMBRE| COSTA RICA|   HEREDIA|      heredia|NULL|
|ASALTO|ARMA BLANCA|2011-05-19|18:00:00 - 20:59:59|   P

In [None]:
import psycopg
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, BooleanType

# Define tus parámetros de conexión
conn_params = {
    'dbname': 'datos',
    'user': 'postgres',
    'password': 'admin',
    'host': 'localhost',
    'port': '5432'
}

# Establece la conexión
try:
    conn = psycopg.connect(**conn_params)
    print("Conexión exitosa")
except Exception as e:
    print(f"Ocurrió un error: {e}")


  """
   Crea un string con el script correspondiente para crear una tabla en la base de datos a partir
   de los encabezados del dataframe deseado
    
    Args:
        df: DataFrame de Spark
        table_name: nombre de la tabla que se quiere crear
        
    Returns:
        String con el script para crear la tabla en postgresql
    """

def create_new_table_sql(df, table_name):
    schema = df.schema
    columns = []
    for field in schema.fields:
        # Clasifica los tipos seg'un el dato que se le ingrese en funcion de los tipos de PostgreSQL
        if isinstance(field.dataType, StringType):
            postgres_type = "VARCHAR"
        elif isinstance(field.dataType, IntegerType):
            postgres_type = "INTEGER"
        elif isinstance(field.dataType, FloatType):
            postgres_type = "FLOAT"
        elif isinstance(field.dataType, DoubleType):
            postgres_type = "DOUBLE PRECISION"
        elif isinstance(field.dataType, BooleanType):
            postgres_type = "BOOLEAN"
        else:
            postgres_type = "TEXT"  # tipo por default

        columns.append(f"{field.name} {postgres_type}")
    
    columns_str = ", ".join(columns)
    create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});"
    return create_table_sql

table_name = "oij"
create_table_sql = create_new_table_sql(df_oij_low, table_name)

with psycopg.connect(**conn_params) as conn:
    with conn.cursor() as cur:
        cur.execute(create_table_sql)
    conn.commit()

conn.close()

Conexión exitosa


In [None]:
"""
    inserta los datos de un dataframe en una tabla de la base de datos
    
    Args:
        df: DataFrame de Spark
        table_name: nombre de la tabla a insertar los datos
        conn_params: informacion de la conexion a la base de datos
        
    
    """

def insert_data_to_postgres(df, table_name, conn_params):
    # Convierte el dataframe en una lista de tuplas
    data = [tuple(row) for row in df.collect()]
    
    # Obtiene los nombres de las columnas del dataframe, correspondientes a las columnas en postgresql
    columns = ", ".join(df.columns)
    placeholders = ", ".join(["%s"] * len(df.columns))  # For psycopg3

    # se construye el script en SQL
    insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    # aca hace la conexion e inserta los datos
    with psycopg.connect(**conn_params) as conn:
        with conn.cursor() as cur:
            cur.executemany(insert_sql, data)
        conn.commit()

insert_data_to_postgres(df_oij_low, table_name,conn_params)