# Carregando Variáveis de Ambiente do Arquivo .env

Usamos python-dotenv para carregar as variáveis de ambiente do arquivo .env.


In [47]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
import pandas as pd

# Carregar variáveis do arquivo .env
load_dotenv()

# Verificar se as variáveis foram carregadas corretamente
print(f"SPARK_HOME: {os.getenv('SPARK_HOME')}")
print(f"HADOOP_HOME: {os.getenv('HADOOP_HOME')}")

SPARK_HOME: C:\spark
HADOOP_HOME: C:\hadoop


# Configuração do Ambiente Spark

Configuramos a sessão do Spark para otimizar o processamento de grandes volumes de dados.


In [30]:
# Inicializa a sessão do Spark com as configurações fornecidas e suporte ao Hive
spark = SparkSession.builder \
    .appName("TransformaçãoFinas") \
    .config("spark.jars", "C:\spark\jars\mysql-connector-j-9.1.0.jar") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "1000") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "32") \
    .config("spark.default.parallelism", "16") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB") \
    .enableHiveSupport() \
    .getOrCreate()

# Verificando configurações
jar_path = spark.conf.get("spark.jars")
print(f"Configured JAR Path: {jar_path}")
print(f"Spark Version: {spark.version}")
print(f"Arrow Enabled: {spark.conf.get('spark.sql.execution.arrow.pyspark.enabled')}")
print(f"Adaptive Query Execution: {spark.conf.get('spark.sql.adaptive.enabled')}")


Configured JAR Path: C:\spark\jars\mysql-connector-j-9.1.0.jar
Spark Version: 3.5.0
Arrow Enabled: true
Adaptive Query Execution: true


In [50]:
# Configurações da conexão SQLAlchemy
username = "alunos_ada"
password = "i#C1C3lP3HX5wckE"
host = "129.148.25.96"
database = "ada_tech"

# String de conexão SQLAlchemy
mysql_url = f"mysql+mysqlconnector://{username}:{password}@{host}/{database}"

# Cria a engine SQLAlchemy
engine = create_engine(mysql_url)

# Caminho do arquivo CSV
csv_file_path = "data/wine.csv"

# Carregar dados do CSV em um DataFrame do Pandas
df_pandas = pd.read_csv(csv_file_path)

# Renomear colunas para remover caracteres não permitidos
df_pandas.columns = df_pandas.columns.str.replace('.', '_')

# Mostrar as primeiras linhas do DataFrame do Pandas
print("Dados carregados do CSV:")
display(df_pandas.head())

# Inferir os tipos de dados das colunas do DataFrame
dtype_mapping = {
    'int64': 'INTEGER',
    'float64': 'FLOAT',
    'object': 'VARCHAR(255)'
}

# Criar a tabela dinamicamente com base nas colunas do DataFrame
columns = ', '.join([f"{col} {dtype_mapping[str(df_pandas[col].dtype)]}" for col in df_pandas.columns])
create_table_query = f"""
CREATE TABLE teophilo_wine_table (
    {columns}
) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
"""

# Executar a query para criar a tabela
with engine.connect() as connection:
    connection.execute(text("DROP TABLE IF EXISTS teophilo_wine_table"))
    connection.execute(text(create_table_query))


print("Tabela `teophilo_wine_table` criada com sucesso no MySQL!")

# Escrever o DataFrame no banco de dados MySQL na tabela "teophilo_wine_table"
df_pandas.to_sql(
    name="teophilo_wine_table",  # Nome da tabela no banco de dados
    con=engine,                  # Conexão SQLAlchemy
    if_exists="append",          # Comportamento se a tabela já existir: "fail", "replace", ou "append"
    index=False                  # Evitar escrever o índice do DataFrame como uma coluna
)

print("Dados escritos com sucesso no MySQL!")

# Ler os dados da tabela recém-criada e exibir as 5 primeiras linhas
df_from_sql = pd.read_sql("teophilo_wine_table", con=engine)
print("Dados da tabela `teophilo_wine_table`:")
display(df_from_sql.head())


Dados carregados do CSV:


Unnamed: 0,Wine,Alcohol,Malic_acid,Ash,Acl,Mg,Phenols,Flavanoids,Nonflavanoid_phenols,Proanth,Color_int,Hue,OD,Proline
0,1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
1,1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050
2,1,13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185
3,1,14.37,1.95,2.5,16.8,113,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480
4,1,13.24,2.59,2.87,21.0,118,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735


Tabela `teophilo_wine_table` criada com sucesso no MySQL!
Dados escritos com sucesso no MySQL!
Dados da tabela `teophilo_wine_table`:


Unnamed: 0,Wine,Alcohol,Malic_acid,Ash,Acl,Mg,Phenols,Flavanoids,Nonflavanoid_phenols,Proanth,Color_int,Hue,OD,Proline
0,1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
1,1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050
2,1,13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185
3,1,14.37,1.95,2.5,16.8,113,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480
4,1,13.24,2.59,2.87,21.0,118,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735


# Criação da Tabela Hive e Carregamento dos Dados

Lemos os dados do arquivo `tabela.csv`, criamos uma tabela Hive e carregamos os dados nesta tabela.



In [None]:
# Carregar dados do CSV
file_path = "data/tabela.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Criar uma tabela Hive a partir do DataFrame
df.createOrReplaceTempView("tabela")

# Executar consulta SQL na tabela Hive
result = spark.sql("SELECT * FROM tabela LIMIT 5")
result.show()


+-------------+--------------------+--------+------------+--------------------+---------------+------+------+----+------------------+--------------------+
|data_extracao|data_atualizacao_car|sigla_uf|id_municipio|           id_imovel|modulos_fiscais|  area|status|tipo|          condicao|           geometria|
+-------------+--------------------+--------+------------+--------------------+---------------+------+------+----+------------------+--------------------+
|   2024-10-04|          2024-09-13|      BA|     2917706|BA-2917706-1A9AF6...|           0.07|4.5953|    AT| IRU|Aguardando analise|POLYGON((-40.0675...|
|   2024-10-04|          2024-09-13|      BA|     2919504|BA-2919504-21D958...|           0.07|4.9206|    AT| IRU|Aguardando analise|POLYGON((-41.8304...|
|   2024-10-04|          2024-09-13|      BA|     2921203|BA-2921203-00882B...|           0.07|4.5962|    AT| IRU|Aguardando analise|POLYGON((-40.5883...|
|   2024-10-04|          2024-09-13|      BA|     2926806|BA-2926806-8

# Informações e Estatísticas da Tabela

Exibimos informações úteis sobre a tabela, como quantidade de linhas, linhas nulas, vazias, duplicadas, tipos de colunas, e outras estatísticas.



In [15]:
# Consultar a quantidade de linhas
qtd_linhas = spark.sql("SELECT COUNT(*) AS qtd_linhas FROM tabela")
qtd_linhas.show()

# Consultar linhas nulas
linhas_nulas = spark.sql("""
    SELECT 
        COUNT(*) AS qtd_linhas_nulas 
    FROM 
        tabela 
    WHERE 
        data_extracao IS NULL 
        OR data_atualizacao_car IS NULL 
        OR sigla_uf IS NULL 
        OR id_municipio IS NULL 
        OR id_imovel IS NULL 
        OR modulos_fiscais IS NULL 
        OR area IS NULL 
        OR status IS NULL 
        OR tipo IS NULL 
        OR condicao IS NULL 
        OR geometria IS NULL
""")
linhas_nulas.show()

# Consultar linhas vazias (para colunas do tipo string)
linhas_vazias = spark.sql("""
    SELECT 
        COUNT(*) AS qtd_linhas_vazias 
    FROM 
        tabela 
    WHERE 
        LENGTH(sigla_uf) = 0 
        OR LENGTH(status) = 0 
        OR LENGTH(tipo) = 0 
        OR LENGTH(condicao) = 0
""")
linhas_vazias.show()

# Consultar linhas duplicadas
linhas_duplicadas = spark.sql("""
    SELECT 
        COUNT(*) AS qtd_linhas_duplicadas 
    FROM (
        SELECT 
            *, 
            COUNT(*) AS count 
        FROM 
            tabela 
        GROUP BY 
            data_extracao, data_atualizacao_car, sigla_uf, id_municipio, id_imovel, modulos_fiscais, area, status, tipo, condicao, geometria
        HAVING 
            count > 1
    ) AS temp
""")
linhas_duplicadas.show()

# Tipos de colunas
tipos_colunas = spark.sql("DESCRIBE tabela")
tipos_colunas.show()

# Exibir todas as estatísticas em uma única consulta
estatisticas = spark.sql("""
    SELECT 
        COUNT(*) AS qtd_linhas,
        SUM(CASE WHEN data_extracao IS NULL OR data_atualizacao_car IS NULL OR sigla_uf IS NULL OR id_municipio IS NULL OR id_imovel IS NULL OR modulos_fiscais IS NULL OR area IS NULL OR status IS NULL OR tipo IS NULL OR condicao IS NULL OR geometria IS NULL THEN 1 ELSE 0 END) AS qtd_linhas_nulas,
        SUM(CASE WHEN LENGTH(sigla_uf) = 0 OR LENGTH(status) = 0 OR LENGTH(tipo) = 0 OR LENGTH(condicao) = 0 THEN 1 ELSE 0 END) AS qtd_linhas_vazias,
        COUNT(DISTINCT id_imovel) AS qtd_distintos_id_imovel,  -- Exemplo para a coluna id_imovel
        COUNT(*) - COUNT(DISTINCT id_imovel) AS qtd_linhas_duplicadas  -- Exemplo para a coluna id_imovel
    FROM 
        tabela
""")
estatisticas.show()

+----------+
|qtd_linhas|
+----------+
|   7705415|
+----------+

+----------------+
|qtd_linhas_nulas|
+----------------+
|               1|
+----------------+

+-----------------+
|qtd_linhas_vazias|
+-----------------+
|                0|
+-----------------+

+---------------------+
|qtd_linhas_duplicadas|
+---------------------+
|                 1126|
+---------------------+

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       data_extracao|     date|   NULL|
|data_atualizacao_car|     date|   NULL|
|            sigla_uf|   string|   NULL|
|        id_municipio|      int|   NULL|
|           id_imovel|   string|   NULL|
|     modulos_fiscais|   double|   NULL|
|                area|   double|   NULL|
|              status|   string|   NULL|
|                tipo|   string|   NULL|
|            condicao|   string|   NULL|
|           geometria|   string|   NULL|
+--------------------+---------+-------+



# Visualização dos Dados

Realizamos algumas visualizações básicas dos dados usando matplotlib.


In [None]:
# Coletar dados para visualização
data_pd = spark.sql("SELECT some_column, other_column FROM tabela").toPandas()

# Plotar um histograma
plt.hist(data_pd['some_column'], bins=50)
plt.xlabel('Some Column')
plt.ylabel('Frequency')
plt.title('Distribuição da Some Column')
plt.show()
