# Instalando bibliotecas

In [None]:
!pip install pyspark
!pip install boto3

# Importando bibliotecas

In [None]:
import pyspark
import boto3
import os

from io import StringIO 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

# Criando a SparkSession

In [None]:
conf = pyspark.SparkConf()

# Criando uma sessão com o Spark que existe localmente(atualmente configurado junto com o JupyterLab)
conf.setMaster("local[1]") 
conf.set("spark.driver.host", "awari-jupyterlab") \
    .set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") \
    .set("parquet.enable.summary-metadata", "false") \
    .set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") \
    .set("spark.driver.port", "20020") \
    .set("spark.hadoop.fs.s3a.endpoint", 'awari-nginx:9000') \
    .set("spark.hadoop.fs.s3a.endpoint.region", 'sa-east-1') \
    .set("spark.hadoop.fs.s3a.access.key", 'XKQmEpTesp9sCSQG') \
    .set("spark.hadoop.fs.s3a.secret.key", 'ba4xzjLWC6zEC52jEVJw3oc3gvuEpkzX') \
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .set("spark.hadoop.com.amazonaws.services.s3.enableV2", "true") \
    .set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "replace") \
    .set("spark.hadoop.fs.s3a.fast.upload", True) \
    .set("spark.hadoop.fs.s3a.path.style.access", True) \
    .set("spark.hadoop.fs.s3a.committer.name", "directory") \
    .set("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/tmp/staging")

conf.setAppName('AwariAula08-S33')
sc = pyspark.SparkContext(conf=conf)

spark = SparkSession(sc)

# Criando o cliente para conectar ao Minio


In [None]:
client = boto3.client('s3', 
    endpoint_url='http://awari-minio-nginx:9000',
    aws_access_key_id='XKQmEpTesp9sCSQG', #substitua por sua access key id
    aws_secret_access_key='ba4xzjLWC6zEC52jEVJw3oc3gvuEpkzX', #substitua por sua secret access key
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False,
    region_name='sa-east-1'
)

# Configurando o diretório e lendo os arquivos

In [None]:
dir_path = "./arquivos/"
csv_files = [file for file in os.listdir(dir_path) if file.endswith('.csv')]

## Importando os CSVs da pasta e salvando como arquivos JSON no MinIO

In [None]:
for file in csv_files:
    path = os.path.join(dir_path, file)
    
    # Lendo o arquivo CSV
    df = spark.read.csv(path, header=True, inferSchema=True)
    
    # Salvando no formato JSON localmente
    local_output_dir = "/tmp/" + file.replace(".csv", ".json")
    df.write.mode("overwrite").json(local_output_dir)
    
    # Identificando os arquivos JSON dentro do diretório
    json_files = [f for f in os.listdir(local_output_dir) if f.endswith('.json')]
    
    for json_file in json_files:
        json_file_path = os.path.join(local_output_dir, json_file)
        s3_path = f'tarefa/json/{file.replace(".csv", "")}/{json_file}'
        
        # Fazendo upload do arquivo JSON para o MinIO
        with open(json_file_path, 'rb') as f:
            client.upload_fileobj(f, 'aula-08', s3_path)

## Importando os JSONs e salvando como arquivos CSV no MinIO.

In [None]:
import glob

bucket_name = 'aula-08'
directory_name = 'tarefa'

# Liste todos os diretórios/objetos no bucket 'aula-08'
objects = client.list_objects(Bucket=bucket_name)['Contents']

# Filtrando os diretórios sob 'tarefa/json'
json_directories = [obj['Key'].split('/')[2] for obj in objects if obj['Key'].startswith(f"{directory_name}/json/")]
json_directories = list(set(json_directories))  # Removendo duplicatas

for json_dir in json_directories:
    # Caminho para os arquivos JSON no diretório atual
    s3_json_path = f"s3a://{bucket_name}/{directory_name}/json/{json_dir}/*.json"
    
    # Lendo todos os arquivos JSON dentro do diretório atual
    try:
        df = spark.read.json(s3_json_path)
    except:
        continue
    
    # Caminho de saída local para o arquivo CSV
    local_dir_path = f"./tmp/{json_dir}"
    try:
        df.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save(local_dir_path)
    except:
        continue
    
    # Buscar o arquivo CSV gerado dentro do diretório
    csv_file = glob.glob(f"{local_dir_path}/part-*.csv")[0]
    
    # Subindo para o MinIO
    s3_path = f"{directory_name}/csv/{json_dir}/output.csv"
    try:
        with open(csv_file, 'rb') as f:
            client.upload_fileobj(f, bucket_name, s3_path)
    except:
        continue


## Importando os CSVs e salvando como arquivo .parquet no MinIO.

In [None]:
# Definindo os diretórios CSV para os estados e municípios
csv_directories = {
    "estados": f"s3a://{bucket_name}/{directory_name}/csv/estados/output.csv",
    "municipios": f"s3a://{bucket_name}/{directory_name}/csv/municipios/output.csv"
}

# Caminho de saída no MinIO para arquivos parquet
parquet_paths = {
    "estados": f"s3a://{bucket_name}/{directory_name}/parquet/estados",
    "municipios": f"s3a://{bucket_name}/{directory_name}/parquet/municipios"
}

# Lendo CSVs e salvando como Parquet
for key, csv_path in csv_directories.items():
    # Lendo CSV
    df = spark.read.option("header", True).csv(csv_path)
    
    # Salvando como Parquet no MinIO
    df.write.mode("overwrite").parquet(parquet_paths[key])


## Importando os CSVs e salvando como tabelas em um banco de dados PostgreSQL.

In [None]:
!pip install psycopg2-binary

In [None]:
import psycopg2

# Configurações de conexão ao PostgreSQL
host = "127.0.0.1"
port = "5432"  # Atualizado para a porta correta
user = "postgres"
password = "postgres"
database = "postgres"  # Conecta-se primeiro ao banco padrão para criar um novo

# Conecta ao banco de dados
try:
    conn = psycopg2.connect(dbname=database, user=user, password=password, host=host, port=port)
    conn.autocommit = True  # Habilita o autocommit para permitir a criação do banco de dados

    # Cria um novo banco de dados chamado aula-08
    with conn.cursor() as cursor:
        cursor.execute("CREATE DATABASE aula_08")
    print("Banco de dados 'aula-08' criado com sucesso!")
except psycopg2.errors.DuplicateDatabase:
    print("Banco de dados 'aula-08' já existe.")
except Exception as e:
    print(f"Erro ao criar o banco de dados: {e}")
finally:
    if 'conn' in locals():  # Adicionada verificação se 'conn' existe
        conn.close()


In [None]:
db_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}
db_url = "jdbc:postgresql://awari-database:5432/aula-08"

# Lendo CSVs e salvando no PostgreSQL
for key, csv_path in csv_directories.items():
    # Lendo CSV
    df = spark.read.option("header", True).csv(csv_path)
    
    # Escrevendo no PostgreSQL
    df.write.jdbc(url=db_url, table=key, mode="overwrite", properties=db_properties)

---

Notebook utilizado para fins educacionais da **Awari**.

**© AWARI. Todos os direitos reservados.**