In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql.functions import col, when


In [19]:

# Define el esquema de los datos
schema = StructType([
    StructField('PERCOD', IntegerType(), True),
    StructField('CONCOD', IntegerType(), True),
    StructField('ABOCOD', IntegerType(), True),
    StructField('CONFCHFINV', DateType(), True),
    StructField('CONFCHINIV', DateType(), True),
    StructField('CONSTSHAB', StringType(), True),
    StructField('CONSTSCON', StringType(), True),
    StructField('PAQCOD', IntegerType(), True),
    StructField('INGCOD', IntegerType(), True),
    StructField('SISCOD', IntegerType(), True),
    StructField('GRPAFICOD', IntegerType(), True),
    StructField('CONFCHULTF', DateType(), True),
    StructField('CONFCHING', DateType(), True),
    StructField('CONSTSIMP', StringType(), True),
    StructField('CONFCHINS', DateType(), True),
    StructField('CNVABOFCHI', DateType(), True),
    StructField('TPOABOCOD', IntegerType(), True),
    StructField('POLCOMCOD', IntegerType(), True),
    StructField('CONCTLNRO', IntegerType(), True),
    StructField('PROCOD', StringType(), True),
    StructField('CONCNTPAQ', IntegerType(), True),
    StructField('CONFECSIS', DateType(), True),
    StructField('CONUSRID', StringType(), True),
    StructField('PAQFLGTPO', StringType(), True),
    StructField('CDEPNRO', IntegerType(), True),
    StructField('CALQFCHDEV', DateType(), True),
    StructField('CALQSTSDEV', StringType(), True),
    StructField('CALQNROCON', StringType(), True),
    StructField('CALQIMP', FloatType(), True),
    StructField('ALQCTLNRO', IntegerType(), True),
    StructField('ALQCTLSER', StringType(), True),
    StructField('CDGARSUCID', IntegerType(), True),
    StructField('DECCTLNRO', IntegerType(), True),
    StructField('DECCTLSER', StringType(), True),
    StructField('CDEPSTS', StringType(), True),
    StructField('CDGARNRORE', StringType(), True),
    StructField('CDGARNROCO', StringType(), True),
    StructField('CDGARFCHDE', DateType(), True),
    StructField('CDGARIMPDE', FloatType(), True),
    StructField('CDGARFCH', DateType(), True),
    StructField('CDGARIMP', FloatType(), True),
    StructField('CDEPTPO', StringType(), True),
    StructField('CSUCIDDEV', IntegerType(), True),
    StructField('CONFCHDXC', DateType(), True),
    StructField('PROMCOD', IntegerType(), True),
    StructField('CONCBOCOD', IntegerType(), True),
    StructField('GCSER', IntegerType(), True),
    StructField('CONFCHSER', DateType(), True),
    StructField('CONDECPPV', StringType(), True),
    StructField('ABOSUCID', IntegerType(), True),
    StructField('CONFLG', StringType(), True),
    StructField('IPROCOD', IntegerType(), True),
    StructField('UCONCTRLCOD', IntegerType(), True),
    StructField('CLIDCALCOD', IntegerType(), True),
    StructField('CONFLGFAC', StringType(), True),
    StructField('CONACCION', StringType(), True),
    StructField('CONAGEFIN', IntegerType(), True),
    StructField('CONAGEINI', IntegerType(), True),
    StructField('CONFCHAGE', DateType(), True),
    StructField('CONUSUACCION', StringType(), True),
    StructField('CONAGEUSU', StringType(), True),
    StructField('CONUSUCOM', IntegerType(), True),
    StructField('CONNROFOL', StringType(), True),
    StructField('CONORDOBS', StringType(), True),
    StructField('CONFLGIMP', StringType(), True),
    StructField('CONCAMNOM', StringType(), True),
    StructField('CONCABFLG', StringType(), True),
    StructField('CONRECAUTFLG', StringType(), True),
    StructField('CONRECFLG', StringType(), True),
    StructField('RECONVERSION2018', StringType(), True),
    StructField('RECONVERSION2021', StringType(), True)
])



In [20]:
# Inicializa SparkSession
spark = SparkSession.builder \
    .appName("Transformando CSV con Spark") \
    .getOrCreate()

# Carga el CSV con el esquema definido
df = spark.read.csv("Tabla ConAbo completa.csv", header=True, schema=schema)

RuntimeError: Java gateway process exited before sending its port number

In [None]:
columnas_datetime = ['CONFCHFINV', 'CONFCHINIV', 'CONFCHULTF', 'CONFCHING', 'CONFCHINS', 'CNVABOFCHI', 'CALQFCHDEV', 'CDGARFCHDE', 'CDGARFCH', 'CONFCHDXC', 'CONFCHSER', 'CONFCHAGE']


In [None]:
df.printSchema()

In [None]:
total_filas = df.count()
print("Número total de filas:", total_filas)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql.types import DateType

# Define una función para verificar si una fecha es anterior a 1900
def check_fecha(fecha):
    if fecha.year < 1900:
        return None
    else:
        return fecha

# Inicializa SparkSession
spark = SparkSession.builder \
    .appName("Transformando Fechas en DataFrame") \
    .getOrCreate()

# Carga el CSV con el esquema definido (suponiendo que ya has cargado los datos en el DataFrame 'df')
# df = ...

# Define las columnas de fechas
columnas_datetime = ['CONFCHFINV', 'CONFCHINIV', 'CONFCHULTF', 'CONFCHING', 'CONFCHINS', 'CNVABOFCHI', 'CALQFCHDEV', 'CDGARFCHDE', 'CDGARFCH', 'CONFCHDXC', 'CONFCHSER', 'CONFCHAGE']

# Aplica la función 'check_fecha' a cada columna de fecha para eliminar las fechas anteriores a 1900
for col_fecha in columnas_datetime:
    df = df.withColumn(col_fecha, expr(f"CASE WHEN YEAR({col_fecha}) >= 1900 THEN {col_fecha} ELSE NULL END").cast(DateType()))

# Muestra algunas filas del DataFrame transformado
df.show(5)


In [None]:
# Comprueba si hay fechas menores a 1900 en las columnas de fecha
fechas_menores_1900 = df.filter(
    # Utiliza la función 'expr' para evaluar la condición en cada columna de fecha
    expr(" OR ".join([f"{col_fecha} < '1900-01-01'" for col_fecha in columnas_datetime]))
)

# Cuenta el número de filas con fechas menores a 1900
num_fechas_menores_1900 = fechas_menores_1900.count()

if num_fechas_menores_1900 == 0:
    print("No hay fechas menores a 1900 en las columnas de fecha.")
else: 
    print(f"Hay {num_fechas_menores_1900} fechas menores a 1900 en las columnas de fecha.")

In [None]:
df.write.parquet("ruta_del_directorio_parquet", mode="overwrite")

In [None]:
from pyspark.sql import SparkSession

# Inicializa SparkSession
spark = SparkSession.builder \
    .appName("Transformando DataFrame y Exportando a Parquet") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .getOrCreate()

# Carga el DataFrame o realiza las transformaciones necesarias
# df = ...

# Guarda el DataFrame en formato Parquet
# Reemplaza 'ruta_del_directorio_parquet' con la ubicación y nombre del directorio que desees para los archivos Parquet
df.write.parquet("ruta_del_directorio_parquet", mode="overwrite")

In [None]:
from pyspark.sql import SparkSession

# Crear una instancia de SparkSession
spark = SparkSession.builder \
    .appName("Export CSV") \
    .getOrCreate()

# Supongamos que tienes un DataFrame llamado "df" que deseas exportar a CSV
# Puedes leer datos desde un archivo o generar el DataFrame desde otras fuentes.

# Ruta donde deseas exportar el archivo CSV
ruta_exportacion = "archivo.csv"

# Utilizar el método "write" del DataFrame para exportar a CSV
df.write.csv(ruta_exportacion, header=True, mode="overwrite")

# Detener la sesión de Spark al finalizar
spark.stop()