In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_server = "spark://spark-master-otmzsp:7077"
minio_server = "http://minio-otmzsp:9000" 

is_incremental = False


In [3]:
spark = SparkSession.builder \
    .appName("Processa Posições Trusted") \
    .master(spark_server) \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", minio_server) \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [4]:
spark

In [5]:
trusted = 's3a://trusted/posicoes'

In [7]:
from datetime import datetime, timedelta
import pytz

# Define o fuso horário de Brasília (GMT-3)
br_tz = pytz.timezone('America/Sao_Paulo')

# Obtém a data e hora atual em GMT-3
now_gmt3 = datetime.now(br_tz)

yesterday_time_br = now_gmt3 #- timedelta(days=1)

# Formata a data
datepartition = yesterday_time_br.strftime("%Y-%m-%d")

print(datepartition)


2024-09-20


In [9]:
from delta.tables import DeltaTable

# Verificar se o diretório é uma tabela Delta
if DeltaTable.isDeltaTable(spark, f'{trusted}/datepartition={datepartition}'):
    df_all = spark.readStream.format("delta").load(trusted) if is_incremental \
    else spark.read.format("delta").load(trusted)
    df_all.show()
else:
    print("O caminho especificado não contém uma tabela Delta válida.")

+--------------------------+-------------------------+--------------------+---------------+------------------------+-----------------------+-------------------+---------------+----------------------+---------------------------+-------------------+-------------------+-----------------------------+--------------------+--------------------+-------------+
|veiculo_horario_referencia|veiculo_letreiro_completo|veiculo_linha_codigo|veiculo_sentido|veiculo_letreiro_destino|veiculo_letreiro_origem|qtde_veiculos_linha|veiculo_prefixo|veiculo_acessibilidade|veiculo_horario_utc_captura|   veiculo_latitude|  veiculo_longitude|veiculo_horario_local_captura| tipo_operacao_linha|                  id|datepartition|
+--------------------------+-------------------------+--------------------+---------------+------------------------+-----------------------+-------------------+---------------+----------------------+---------------------------+-------------------+-------------------+-------------------------

In [10]:
df_all.count()

9468677

In [11]:
# Configurações
ES_URL = "http://elasticsearch-otmzsp:9200"  # URL do Elasticsearch
KIBANA_URL = "http://kibana-otmzsp:5601"
# Nome index pattern
INDEX_NAME = "posicoes"  # Nome do índice
INDEX_PATTERN_NAME = INDEX_NAME#+"-*"

In [12]:
try:
    from elasticsearch import Elasticsearch
except ImportError:
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "elasticsearch"])
    from elasticsearch import Elasticsearch


In [13]:
import requests

# Conectar ao Elasticsearch
es = Elasticsearch(ES_URL)  # ajuste a URL se necessário

# Verificar se o índice existe
if not es.indices.exists(index=INDEX_NAME):
    # Mapeamento do índice
    mapping = {
        "mappings": {
            "properties": {
                "veiculo_horario_referencia": {
                    "type": "date"
                },
                "veiculo_letreiro_completo": {
                    "type": "text"
                },
                "veiculo_linha_codigo": {
                    "type": "long"
                },
                "veiculo_sentido": {
                    "type": "text"
                },
                "veiculo_letreiro_destino": {
                    "type": "text"
                },
                "veiculo_letreiro_origem": {
                    "type": "text"
                },
                "qtde_veiculos_linha": {
                    "type": "long"
                },
                "veiculo_prefixo": {
                    "type": "long"
                },
                "veiculo_acessibilidade": {
                    "type": "text"
                },
                "veiculo_horario_utc_captura": {
                    "type": "date"
                },
                "veiculo_latitude": {
                    "type": "float"
                },
                "veiculo_longitude": {
                    "type": "float"
                },
                "veiculo_horario_local_captura": {
                    "type": "date"
                },
                "tipo_operacao_linha": {
                    "type": "text"
                }
            }
        }
    }

    # Criar o índice com o mapeamento
    es.indices.create(index=INDEX_NAME, body=mapping)
    print(f"Índice '{INDEX_NAME}' criado com sucesso.")
else:
    print(f"O índice '{INDEX_NAME}' já existe.")



Índice 'posicoes' criado com sucesso.


In [14]:
from pyspark.sql import functions as F
import requests
import json

# Supondo que df_final já contém a coluna 'veiculo_horario_referencia' (timestamp)

# Criar colunas de data e hora a partir do campo de timestamp
df_final_with_date = df_all.withColumn("day", F.date_format("veiculo_horario_referencia", "yyyy-MM-dd")) \
                           .withColumn("hour", F.hour("veiculo_horario_referencia"))

# Obter dias e horas únicos
unique_dates_hours = df_final_with_date.select("day", "hour").distinct().collect()

# Função para enviar dados para Elasticsearch
BULK_SIZE = 10000  # Definir o tamanho do lote

def insert_data_bulk(data):
    bulk_data = ""
    for document in data:
        document = json.loads(document)
        document_id = document.get("id")  # Assumindo que o ID está no documento com a chave 'id'
        bulk_data += f'{{ "index": {{ "_index": "{INDEX_NAME}", "_id": "{document_id}" }} }}' + '\n'
        bulk_data += f'{json.dumps(document)}' + '\n'  # Serializa o documento de volta para JSON
    
    response = requests.post(f"{ES_URL}/_bulk", data=bulk_data.encode('utf-8'), headers={"Content-Type": "application/x-ndjson"})
    
    if response.status_code == 200:
        response_data = response.json()
        if not response_data["errors"]:
            print(f"Dados enviados com sucesso para o Elasticsearch!")
        else:
            print(f"Erros ao enviar para o Elasticsearch: {response_data}")
    else:
        print(f"Erro ao enviar para o Elasticsearch: {response.text}")

# Loop pelos dias e horários únicos
for row in unique_dates_hours:
    day = row['day']
    hour = row['hour']

    # Filtrar o DataFrame para registros de um dia e hora específicos
    df_filtered = df_final_with_date.filter((F.col("day") == day) & (F.col("hour") == hour))
    
    df_filtered = df_filtered.drop("day", "hour", "datepartition")

    # Converter o DataFrame para JSON
    json_rdd = df_filtered.toJSON().collect()

    # Dividir em lotes e enviar para o Elasticsearch
    data_to_insert = []
    for doc in json_rdd:
        data_to_insert.append(doc)
        
        if len(data_to_insert) >= BULK_SIZE:
            insert_data_bulk(data_to_insert)  # Enviar o lote
            data_to_insert = []  # Limpar a lista
    
    # Enviar os dados restantes que não completaram um lote
    if data_to_insert:
        insert_data_bulk(data_to_insert)


In [21]:
spark.stop()