In [1]:
from pyspark.sql import SparkSession

# Inicializando a sessão Spark
spark = SparkSession.builder \
    .appName("PredictDropout") \
    .getOrCreate()

# Verificando se a sessão foi criada
print(spark)

<pyspark.sql.session.SparkSession object at 0x000002114D0D72D0>


In [2]:
# Lista de tabelas
tables = [
    "alunos", "curriculum", "disciplinas_1979", "disciplinas_1990", "disciplinas_1999", "disciplinas_2017", "disciplinas_2023",
    "matriculas", "tabela_alunos", "tabela_cursos", "tabela_dados_ingresso", "tabela_dados_pessoais", "tabela_disciplinas", "tabela_motivo_evasao"
]

# Carregando as tabelas
dataframes = {table: spark.read.csv(f"E:/Mestrado UFCG/Semestre 2024.2/Dados/Tabelas_0/{table}.csv", header=True, inferSchema=True) for table in tables}

In [3]:
from pyspark.sql.functions import col, mean, when, trim, lower

# Função para tratamento de valores ausentes
def handle_missing_values(df, strategy="mean"):
    for column in df.columns:
        if df.select(column).distinct().count() > 1:  # Evita erro com colunas únicas
            if strategy == "mean" and df.schema[column].dataType in ['int', 'double', 'float']:
                mean_value = df.select(mean(col(column))).collect()[0][0]
                df = df.withColumn(column, when(col(column).isNull(), mean_value).otherwise(col(column)))
            elif strategy == "median" and df.schema[column].dataType in ['int', 'double', 'float']:
                median_value = df.approxQuantile(column, [0.5], 0.01)[0]
                df = df.withColumn(column, when(col(column).isNull(), median_value).otherwise(col(column)))
            elif strategy == "mode":
                mode_value = df.groupBy(column).count().orderBy(col("count").desc()).first()[0]
                df = df.withColumn(column, when(col(column).isNull(), mode_value).otherwise(col(column)))
            elif strategy == "drop":
                df = df.na.drop()
    return df

# Função para remoção de outliers
def remove_outliers(df, column):
    if df.schema[column].dataType in ['int', 'double', 'float']:
        quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
        q1, q3 = quantiles[0], quantiles[1]
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        df = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))
    return df

# Função para padronização de categorias
def standardize_categories(df, column):
    df = df.withColumn(column, col(column).cast("string"))
    df = df.withColumn(column, trim(lower(col(column))))  # Remove espaços e converte para minúsculas
    return df

In [6]:
# Aplicando as transformações nas tabelas
for table, df in dataframes.items():
    print(f"Processando tabela: {table}")
    df = handle_missing_values(df, strategy="mean")  # Tratamento de valores ausentes
    for col_name in df.columns:
        if df.schema[col_name].dataType in ['int', 'double', 'float']:
            df = remove_outliers(df, col_name)  # Remoção de outliers
        else:
            df = standardize_categories(df, col_name)  # Normalização de categorias
    
    df = df.dropDuplicates()  # Remoção de duplicatas

    # Verificações antes de salvar
    print(f"Verificando DataFrame: {table}")
    df.printSchema()
    df.show(5)
    print(f"Número de linhas: {df.count()}")

    # Salvando o DataFrame
    try:
        df.write.mode("overwrite").csv(f"file:///E:/Mestrado UFCG/Semestre 2024.2/Dados/Tabelas_0/cleaned_{table}", header=True)
        print(f"Tabela {table} salva com sucesso!")
    except Exception as e:
        print(f"Erro ao salvar a tabela {table}: {e}")

Processando tabela: alunos
Verificando DataFrame: alunos
root
 |-- MATRICULA;ID_CIDADAO;NOME;IDADE;E-MAIL;GENERO;ESTADO_CIVIL_ALUNOS;NACIONALIDADE;LOCAL_NASCIMENTO;ESTADO;TERMO_ESTADO;RAZAO_INATIVIDADE;TIPO_ADMISSAO;TERMO_ADMISSAO;POLITICA_AFIRMATIVA;TIPO_ENSINO_MEDIO;ANO_FORMATURA_ENSINO_MEDIO;CODIGO_CURSO;CODIGO_CURRICULAR; ALUNOS_ATIVOS;EX_ALUNOS;ALUNOS_INATIVOS: string (nullable = true)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MATRICULA;ID_CIDADAO;NOME;IDADE;E-MAIL;GENERO;ESTADO_CIVIL_ALUNOS;NACIONALIDADE;LOCAL_NASCIMENTO;ESTADO;TERMO_ESTADO;RAZAO_INATIVIDADE;TIPO_ADMISSAO;TERMO_ADMISSAO;POLITICA_AFIRMATIVA;TIPO_ENSINO_MEDIO;ANO_FORMATURA_ENSINO_MEDIO;CODIGO_CURSO;CODIGO_CURRICULAR; ALUNOS_ATIVOS;EX_ALUNOS;ALUNOS_INATIVOS|

### Visualizar tabelas pós limpeza ###

In [13]:
# Lista de tabelas limpas
cleaned_tables = {f"cleaned_{table}": spark.read.csv(f"file:///E:/Mestrado UFCG/Semestre 2024.2/Dados/Tabelas_0/cleaned_{table}", header=True, inferSchema=True) for table in tables}

Py4JJavaError: An error occurred while calling o438.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:180)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:162)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:133)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:96)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:68)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:539)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:405)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


## Visualização com PySpark ##


In [7]:
# Exibir as primeiras 10 linhas de um DataFrame
df.show(10)

# Exibir o schema do DataFrame
df.printSchema()

# Exibir estatísticas descritivas
df.describe().show()

+---------------------------------------+
|MATRICULA;RAZAO_DE_INATIVIDADE_DE_ALUNO|
+---------------------------------------+
|                   107210010;desconh...|
|                     109210017;abandono|
|                     112150428;expulsao|
|                   112210115;desconh...|
|                   112210439;desconh...|
|                     113111428;abandono|
|                     113111803;expulsao|
|                   113210893;desconh...|
|                   114110469;desconh...|
|                   114111357;desconh...|
+---------------------------------------+
only showing top 10 rows

root
 |-- MATRICULA;RAZAO_DE_INATIVIDADE_DE_ALUNO: string (nullable = true)

+-------+---------------------------------------+
|summary|MATRICULA;RAZAO_DE_INATIVIDADE_DE_ALUNO|
+-------+---------------------------------------+
|  count|                                   3761|
|   mean|                                   NULL|
| stddev|                                   NULL|
|    min|

### Converter DataFrame do PySpark para Pandas ###

In [11]:
print(pandas_df.columns)

Index(['MATRICULA;RAZAO_DE_INATIVIDADE_DE_ALUNO'], dtype='object')


In [12]:
sns.countplot(x='curso', data=pandas_df)
plt.title('Contagem de Alunos por Curso')
plt.xlabel('Curso')
plt.ylabel('Contagem')
plt.show()

ValueError: Could not interpret value `curso` for `x`. An entry with this name does not appear in `data`.

In [8]:
# Converter DataFrame do PySpark para Pandas
pandas_df = df.toPandas()

# Exibir as primeiras linhas no Pandas
print(pandas_df.head())

  MATRICULA;RAZAO_DE_INATIVIDADE_DE_ALUNO
0                  107210010;desconhecido
1                      109210017;abandono
2                      112150428;expulsao
3                  112210115;desconhecido
4                  112210439;desconhecido


In [9]:
pip install matplotlib seaborn

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
import matplotlib.pyplot as plt
import seaborn as sns

# Exemplo: Contagem de valores em uma coluna categórica
sns.countplot(x='coluna_categorica', data=pandas_df)
plt.title('Contagem de Valores por Categoria')
plt.xlabel('Categoria')
plt.ylabel('Contagem')
plt.show()

ValueError: Could not interpret value `coluna_categorica` for `x`. An entry with this name does not appear in `data`.