In [71]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import datetime # Para gerar timestamps no nome do arquivo de schema

# Inicializa a SparkSession
# O .builder fornece uma interface para configurar o Spark.
# .appName() define um nome para sua aplicação Spark (útil para monitoramento).
# .getOrCreate() tenta obter uma SparkSession existente ou cria uma nova se não houver.
spark = SparkSession.builder \
    .appName("MeuPrimeiroSparkSession") \
    .getOrCreate()

# Opcional: Para verificar se a sessão foi criada, você pode imprimir o objeto spark
print("SparkSession inicializada com sucesso!")
print(spark)

SparkSession inicializada com sucesso!
<pyspark.sql.session.SparkSession object at 0x78dc97567ec0>


In [72]:

import os
# Removido: from pyspark.sql import SparkSession, pois a sessão 'spark' será global

def read_csv(caminho_do_arquivo_csv: str):
    """
    Lê um arquivo CSV específico usando PySpark e retorna o DataFrame.
    Assume que a SparkSession (variável 'spark') já está inicializada e disponível
    no escopo global ou acessível.
    
    Os parâmetros 'delimiter' e 'charset' são definidos internamente.

    Args:
        caminho_do_arquivo_csv (str): O caminho completo para o arquivo CSV a ser lido.
                                     Ex: "/home/ander/Documentos/projetos/chegadas_turistas_internacionais_brasil/data/chegadas_1989.csv"

    Returns:
        pyspark.sql.DataFrame: O DataFrame contendo os dados do CSV, ou None em caso de erro.
    """
    # Parâmetros fixos para a leitura do CSV
    DELIMITER = ";"
    CHARSET = "windows-1252"

    print(f"--- Iniciando leitura do CSV: {os.path.basename(caminho_do_arquivo_csv)} ---")

    try:
        df = spark.read.format("csv") \
          .option("header", "true") \
          .option("inferSchema", "true") \
          .option("delimiter", DELIMITER) \
          .option("charset", CHARSET) \
          .load(caminho_do_arquivo_csv)

        print(f"CSV '{os.path.basename(caminho_do_arquivo_csv)}' lido com sucesso.")
        return df

    except NameError:
        print("Erro: A variável 'spark' (SparkSession) não está definida. Certifique-se de inicializá-la antes de chamar esta função.")
        return None
    except Exception as e:
        print(f"Erro ao ler o CSV '{caminho_do_arquivo_csv}': {e}")
        return None

In [73]:
import os
from pyspark.sql import DataFrame
from pyspark.sql.types import (
    StringType, IntegerType, LongType, FloatType, DoubleType,
    BooleanType, DateType, TimestampType, StructField
)

# --- 3. A nova função para gerar e salvar o CREATE TABLE MySQL ---
def gerar_e_salvar_create_table_mysql(df: DataFrame, table_name: str, output_dir: str):
    """
    Gera um script SQL 'CREATE TABLE' para MySQL a partir do schema de um DataFrame PySpark
    e o salva em um arquivo.
    """
    mysql_types_map = {
        StringType: "VARCHAR(255)",
        IntegerType: "INT",
        LongType: "BIGINT",
        FloatType: "FLOAT",
        DoubleType: "DOUBLE",
        BooleanType: "BOOLEAN",
        DateType: "DATE",
        TimestampType: "DATETIME",
    }
    print(f"--- Gerando script CREATE TABLE para '{table_name}' ---")
    columns = []
    for field in df.schema.fields:
        col_name = field.name
        col_type = type(field.dataType)
        sql_type = mysql_types_map.get(col_type)
        if sql_type is None:
            print(f"Aviso: Tipo PySpark '{field.dataType}' para a coluna '{col_name}' não mapeado, usando VARCHAR(255).")
            sql_type = "VARCHAR(255)"
        columns.append(f"`{col_name}` {sql_type}")
    
    columns_sql = ",\n    ".join(columns)
    create_table_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` (\n    {columns_sql}\n);"
    
    os.makedirs(output_dir, exist_ok=True)
    sql_file_path = os.path.join(output_dir, f"{table_name}_create_table.sql")
    
    try:
        with open(sql_file_path, "w") as sql_file:
            sql_file.write(create_table_sql)
        print(f"Script CREATE TABLE salvo com sucesso em: {sql_file_path}")
    except Exception as e:
        print(f"Erro ao salvar o script SQL para '{table_name}': {e}")



In [74]:
# --- Exemplo de uso no seu fluxo ---
if __name__ == "__main__":
    # Caminho do seu CSV (como usuário Ubuntu, você está acostumado com essa estrutura)
    caminho_csv = "/home/ander/Documentos/projetos/chegadas_turistas_internacionais_brasil/data/chegadas_1989.csv"
    
    # Diretório para salvar o script SQL de criação da tabela
    diretorio_sql_output = "/home/ander/Documentos/projetos/chegadas_turistas_internacionais_brasil/schemas"
    
    # Nome da tabela no MySQL
    nome_da_tabela = "chegadas_turistas_1989" # Pode ser dinâmico também

    # 1. Ler o CSV com PySpark
    df_chegadas = read_csv(caminho_csv)

    if df_chegadas is not None:
        print("\nCSV lido com sucesso. Gerando script SQL...")
        # 2. Gerar e salvar o script CREATE TABLE para MySQL
        gerar_e_salvar_create_table_mysql(df_chegadas, nome_da_tabela, diretorio_sql_output)
    else:
        print("\nNão foi possível ler o CSV, pulando a geração do script SQL.")

--- Iniciando leitura do CSV: chegadas_1989.csv ---
CSV 'chegadas_1989.csv' lido com sucesso.

CSV lido com sucesso. Gerando script SQL...
--- Gerando script CREATE TABLE para 'chegadas_turistas_1989' ---
Script CREATE TABLE salvo com sucesso em: /home/ander/Documentos/projetos/chegadas_turistas_internacionais_brasil/schemas/chegadas_turistas_1989_create_table.sql
