In [2]:
import os
from pyspark.sql import SparkSession

def read_csv_with_pyspark(filename, folder='data'):
    spark = SparkSession.builder \
        .appName("Read CSV with PySpark") \
        .getOrCreate()

    file_path = os.path.join(folder, filename)
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    return df

In [3]:
filename = 'colusiones_en_contratacion_SIC.csv'
df = read_csv_with_pyspark(filename)
df.show(5)

+---+-------------------+--------+-----------+----------------------------+----------------------+---------------------+--------------------------+--------------------+--------------+-------------+--------------+
|No.|Fecha de Radicacion|Radicado|       Caso|Falta que origina la sancion|Resolucion de Apertura|Resolucion de Sancion|Tipo de Persona Sancionada|Personas Sancionadas|Identificacion|Multa Inicial|Año Radicacion|
+---+-------------------+--------+-----------+----------------------------+----------------------+---------------------+--------------------------+--------------------+--------------+-------------+--------------+
|  1|2011-06-07 00:00:00|11-71590|VIGILANCIA |        Infringir el artí...|  Resolución de Ape...| Resolución de San...|        Personas Jurídicas|GUARDIANES COMPAÑ...|     860520097| 8.59440305E9|          2011|
|  1|2011-06-07 00:00:00|11-71590|VIGILANCIA |        Infringir el artí...|  Resolución de Ape...| Resolución de San...|        Personas Jurídicas|E

In [10]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, isnull, min, max, mean, stddev
from pyspark.sql.types import DoubleType, FloatType, IntegerType, LongType, ShortType, TimestampType

def analyze_data_quality(filename, folder='data'):
    spark = SparkSession.builder \
        .appName("Analyze Data Quality with PySpark") \
        .getOrCreate()

    file_path = os.path.join(folder, filename)
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Cuenta de registros
    record_count = df.count()
    print(f"Total de registros: {record_count}")

    # Cuenta de registros nulos y duplicados
    null_count = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).collect()
    duplicate_count = df.count() - df.dropDuplicates().count()

    print("\nConteo de registros nulos por columna:")
    for col_name, nulls in zip(df.columns, null_count[0]):
        print(f"{col_name}: {nulls}")

    print(f"\nConteo de registros duplicados: {duplicate_count}")

    # Estadísticas descriptivas (mínimo, máximo, promedio, desviación estándar) para columnas numéricas
    numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ("double", "float", "int", "bigint", "smallint")]
    summary_stats = df.select(numeric_columns).summary("min", "max", "mean", "stddev").collect()

    print("\nEstadísticas descriptivas para columnas numéricas:")
    for stat in summary_stats:
        print(f"{stat['summary']}:")

        for col_name in numeric_columns:
            print(f"  {col_name}: {stat[col_name]}")

    # Identificación de valores atípicos (basado en el rango intercuartil)
    for col_name in numeric_columns:
        q1, q3 = df.approxQuantile(col_name, [0.25, 0.75], 0.01)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        outliers = df.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound)).count()

        print(f"\nValores atípicos en la columna {col_name}: {outliers}")


In [9]:
analyze_data_quality('multas_SECOP.csv', folder='data')

Total de registros: 1444

Conteo de registros nulos por columna:
nombre_entidad: 0
nit_entidad: 0
nivel: 0
orden: 0
numero_de_resolucion: 0
documento_contratista: 0
nombre_contratista: 0
numero_de_contrato: 0
valor_sancion: 0
fecha_de_publicacion: 0
ruta_de_proceso: 0
cod_depto: 11
cod_mpio: 11
dpto: 0
nom_mpio: 0

Conteo de registros duplicados: 0

Estadísticas descriptivas para columnas numéricas:
min:
max:
mean:
stddev:
