In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import * 
from pyspark.sql import SparkSession, DataFrame as SparkDataFrame
import pyspark.sql.functions as F
from pyspark.sql.functions import col,isnan, when, count, coalesce
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, lead
import json
from functools import reduce
import sys
from cassandra.cluster import Cluster
import os
import time

# from mock.tasks import adiciona_carro}
cluster = Cluster(['cassandra'])
session = cluster.connect()

ss = SparkSession.builder.appName("test").getOrCreate()
sql = SQLContext(ss)

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

session.execute("USE simulacao")

import json
params = json.load(open('./mock/parametros.json'))



In [2]:
number = 1

In [3]:
fps = 30
vel_media = 0
n_vel_media = 0

tempo_medio = 0
n_tempo_medio = 0

def atualiza_media(media_atual, tamanho_atual, media_add, tamanho_add):
    if media_add == None:
        return media_atual
    if tamanho_atual == 0:
        return media_add
    tamanho_total = tamanho_atual + tamanho_add
    return (media_atual/tamanho_total)*tamanho_atual + (media_add/tamanho_total)*tamanho_add

def processa_velocidade_media(batch):
    global vel_media, n_vel_media
    batch = batch.na.fill(0, subset=['velocidade'])
    vel_media_batch = batch.agg(F.mean('velocidade')).collect()[0][0]
    length_batch = batch.select(F.count('velocidade')).collect()[0][0]
    n_vel_media += length_batch
    vel_media = atualiza_media(vel_media, n_vel_media, vel_media_batch, length_batch)
    pass

def processa_tempo_cruzamento(batch):
    global n_tempo_medio, tempo_medio
    batch = batch.filter(col("tempo_em_curso") != 0)
    tempo_medio_batch = batch.agg(F.mean('tempo_em_curso')).collect()[0][0]
    length_batch = batch.select(F.count('tempo_em_curso')).collect()[0][0]
    n_tempo_medio += length_batch
    tempo_medio = atualiza_media(tempo_medio, n_tempo_medio, tempo_medio_batch, length_batch)
    pass

def processa_carro(DadosNovos, DadosCarros, colision_tolerance, colision_tolerance_quad, Parametros):
    time0 = time.time()
    DadosCarros = DadosCarros.drop('aplicaMulta')
    time1 = time.time()
    print('Drop tempo: ',time1-time0)
    time0 = time.time()
    # Renomeia coluna do dado novo
    DadosNovos = DadosNovos.select(F.col('pos_y').alias('posicao_nova'), F.col('rodovia').alias('rodovia_nova'),
                                   F.col('pos_x').alias('faixa_nova'),
                                   F.col('tempo_da_simulacao').alias('tempo_da_simulacao_novo'),F.col('placa'))
    
    data_joined = DadosCarros.join(DadosNovos, on="placa", how='right')

    time1 = time.time()
    print('Join tempo: ',time1-time0)
    time0 = time.time()
    
    CarrosSumidos = DadosCarros.join(DadosNovos, on="placa", how='left_anti')
    processa_tempo_cruzamento(CarrosSumidos)
    data_joined = data_joined.withColumn("tempo_inicio", when(col("tempo_inicio").isNull(), col("tempo_da_simulacao_novo")).otherwise(col("tempo_inicio")))
    data_joined = data_joined.withColumn("tempo_em_curso", col("tempo_da_simulacao_novo") - col("tempo_inicio"))
    data_joined = data_joined.withColumn("rodovia", coalesce(col("rodovia_nova"), col("rodovia")))
    data_joined = data_joined.withColumn("troca_de_faixa", col("faixa_nova") != col("faixa"))
    data_joined = data_joined.withColumn("faixa", coalesce(col("faixa_nova"), col("faixa")))
    data_joined = data_joined.drop("rodovia_nova", "faixa_nova")

    time1 = time.time()
    print('Cruzamento tempo: ',time1-time0)
    time0 = time.time()
    
    data_joined = data_joined.withColumn("diferenca_de_posicao", col("posicao_nova") - col("posicao"))
    
    data_joined = data_joined.withColumn("diferenca_de_horario", (col("tempo_da_simulacao_novo") - col("tempo_da_simulacao"))*fps)

    data_joined = data_joined.withColumnRenamed("velocidade", "velocidade_antiga")

    data_joined = data_joined.withColumn("velocidade", col("diferenca_de_posicao") / col("diferenca_de_horario"))
    processa_velocidade_media(data_joined)
    
    data_joined = data_joined.withColumn("diferenca_de_velocidade", col("velocidade") - col("velocidade_antiga"))

    data_joined = data_joined.withColumn("aceleracao", col("diferenca_de_velocidade") / col("diferenca_de_horario"))

    data_joined = data_joined.drop("velocidade_antiga", "posicao", "tempo_da_simulacao",
                     "diferenca_de_posicao", "diferenca_de_horario",
                     "diferenca_de_velocidade")

    data_joined = data_joined.withColumnRenamed("posicao_nova", "posicao")

    data_joined = data_joined.withColumnRenamed("tempo_da_simulacao_novo", "tempo_da_simulacao")

    data_joined = data_joined.withColumn("posicao_prevista", col("posicao")\
                           + col("velocidade")*colision_tolerance\
                           + col("aceleracao")*colision_tolerance_quad)

    time1 = time.time()
    print('With tempo: ',time1-time0)
    time0 = time.time()

    Velocidades_Maximas = Parametros.select(F.col('rodovia'), F.col("VelocidadeMaxima"))
    Aceleracoes_Maximas = Parametros.select(F.col('rodovia'), 0.8*F.col("AceleracaoMaxima"))
    Aceleracoes_Maximas = Aceleracoes_Maximas.withColumnRenamed("(AceleracaoMaxima * 0.8)", "AceleracaoMaxima")
    
    data_joined = data_joined.join(Velocidades_Maximas, on="rodovia", how="left")
    data_joined = data_joined.join(Aceleracoes_Maximas, on="rodovia", how="left")

    acima_vel_df = data_joined.select(F.col('placa'), F.col('acima_vel').alias('acima_vel_antigo'))
    data_joined = data_joined.withColumn("acima_vel", F.when(F.abs(data_joined["velocidade"]) > F.abs(fps/data_joined["VelocidadeMaxima"]), 1).otherwise(0))
    data_joined = data_joined.withColumn("acima_acel", F.when(F.abs(data_joined["aceleracao"]) > F.abs(fps/data_joined["AceleracaoMaxima"]), 1).otherwise(0))

    acima_vel_df = acima_vel_df.join(data_joined.select(F.col('placa'), F.col('acima_vel').alias('acima_vel_novo')), on='placa', how="left")
    acima_vel_df = acima_vel_df.withColumn("aplicaMulta", (F.col('acima_vel_antigo')==0) &  (F.col('acima_vel_novo')==1) )



    data_joined = data_joined.join(acima_vel_df.select(F.col('placa'), F.col('aplicaMulta')), on='placa', how='left')

    data_joined = data_joined.drop("VelocidadeMaxima")
    data_joined = data_joined.drop("AceleracaoMaxima")

    time1 = time.time()
    print('Acima tempo: ',time1-time0)
    time0 = time.time()

    window_spec = Window.partitionBy("rodovia", "faixa").orderBy('posicao')
    
    # Use lag function with the window specification
    lag_column = col("posicao_prevista") - lag(col("posicao_prevista")).over(window_spec)
    lead_column = lead(col("posicao_prevista")).over(window_spec) - col("posicao_prevista")

    
    # Add the lag column to the DataFrame
    data_joined = data_joined.withColumn("Risco_Colisão", when(((lag_column < 0) & (col("rodovia") == lag(col("rodovia")).over(window_spec)) & (col("faixa") == lag(col("faixa")).over(window_spec)))| ((lead_column < 0) & (col("rodovia") == lead(col("rodovia")).over(window_spec)) & (col("faixa") == lead(col("faixa")).over(window_spec))), 1).otherwise(0))

    time1 = time.time()
    print('Window tempo: ',time1-time0)
    time0 = time.time()
    
    # Show the result
    return data_joined
    

In [4]:
df_multas = ss.createDataFrame([], "placa: string, tempo_da_simulacao: int, multa_numero : int")
def aplica_multa(df):
    global df_multas
    df = df.filter(F.col('aplicaMulta') == True)
    df = df.select(F.col('placa'),F.col('tempo_da_simulacao'))
    df2 = df_multas.groupBy('placa').agg(F.count('placa').alias('multa_numero'))
    df2 = df2.withColumn('multa_numero',F.col('multa_numero')+1)    
    df2 = df2.join(df,['placa'],how='right')
    df2 = df2.withColumn('multa_numero', F.when(F.isnull('multa_numero'), 1).otherwise(F.col('multa_numero')))
    df_multas = df2.union(df_multas)

In [5]:
df_perigosa = ss.createDataFrame([], "placa: string, tempo_da_simulacao: int, perigosa_numero : int, tipo: string")
def perigosa_acel(df):
    global df_perigosa
    df = df.filter(F.col('acima_acel') == 1)
    df = df.select(F.col('placa'),F.col('tempo_da_simulacao'))
    df2 = df_multas.groupBy('placa').agg(F.count('placa').alias('perigosa_numero'))
    df2 = df2.withColumn('perigosa_numero',F.col('perigosa_numero')+1)    
    df2 = df2.join(df,['placa'],how='right')
    df2 = df2.withColumn('perigosa_numero', F.when(F.isnull('perigosa_numero'), 1).otherwise(F.col('perigosa_numero')))
    df2 = df2.withColumn('tipo',F.lit('acima_acel'))
    df_perigosa = df2.union(df_perigosa)

def perigosa_faixa(df):
    global df_perigosa
    df = df.filter(F.col('troca_de_faixa') == True)
    df = df.select(F.col('placa'),F.col('tempo_da_simulacao'))
    df2 = df_multas.groupBy('placa').agg(F.count('placa').alias('perigosa_numero'))
    df2 = df2.withColumn('perigosa_numero',F.col('perigosa_numero')+1)    
    df2 = df2.join(df,['placa'],how='right')
    df2 = df2.withColumn('perigosa_numero', F.when(F.isnull('perigosa_numero'), 1).otherwise(F.col('perigosa_numero')))
    df2 = df2.withColumn('tipo',F.lit('troca_de_faixa'))
    df_perigosa = df2.union(df_perigosa)

def perigosa_vel(df):
    global df_perigosa
    df = df.filter(F.col('acima_vel') == 1)
    df = df.select(F.col('placa'),F.col('tempo_da_simulacao'))
    df2 = df_multas.groupBy('placa').agg(F.count('placa').alias('perigosa_numero'))
    df2 = df2.withColumn('perigosa_numero',F.col('perigosa_numero')+1)    
    df2 = df2.join(df,['placa'],how='right')
    df2 = df2.withColumn('perigosa_numero', F.when(F.isnull('perigosa_numero'), 1).otherwise(F.col('perigosa_numero')))
    df2 = df2.withColumn('tipo',F.lit('acima_vel'))
    df_perigosa = df2.union(df_perigosa)


In [10]:
p = [[key]+list(params[key].values()) for key in params.keys()]
p = ss.createDataFrame(p, ["Rodovia"]+list(params[list(params.keys())[0]].keys()))
df_old = ss.createDataFrame([], "placa: string, posicao: int, faixa: int, rodovia: string, tempo_da_simulacao: int, velocidade: double, aceleracao: double, posicao_prevista: double, acima_vel: boolean, aplicaMulta: boolean, tempo_em_curso: int,tempo_inicio: int")

def pipeline(first=1,last=session.execute("select count(*) from simulacao").one()[0]):
    global p
    global df_old
    number = first
    while number < last:
        print(number)
        time0 = time.time()
        r = list(session.execute("SELECT * FROM simulacao WHERE tempo_da_simulacao = "+str(number)+" ALLOW FILTERING;"))
        time1 = time.time()
        print('Leitura do banco tempo: ',time1-time0)
        time0 = time.time()
        if r != []:
            df = ss.createDataFrame(r)
            time1 = time.time()
            print('Criação do df tempo: ',time1-time0)
            time0 = time.time()
            df_old = processa_carro(df, df_old, 1, 0.5, p)
            time1 = time.time()
            print('Processa carros tempo: ',time1-time0)
            time0 = time.time()
            aplica_multa(df_old)
            time1 = time.time()
            print('Aplica multa tempo: ',time1-time0)
            time0 = time.time()
            perigosa_acel(df_old)
            perigosa_faixa(df_old)
            perigosa_vel(df_old)
            time1 = time.time()
            print('Perigosa tempo: ',time1-time0)
            time0 = time.time()
        number+=1
        # last = session.execute("select count(*) from simulacao").one()[0]

pipeline(169,171)

169
Leitura do banco tempo:  0.008861064910888672
Criação do df tempo:  0.025430917739868164
Drop tempo:  0.005769252777099609
Join tempo:  0.025847196578979492
Cruzamento tempo:  0.8729503154754639
With tempo:  1.0059256553649902
Acima tempo:  0.11129879951477051
Window tempo:  0.03922843933105469
Processa carros tempo:  2.0613090991973877
Aplica multa tempo:  1.0990440845489502
Perigosa tempo:  8.29113507270813
170
Leitura do banco tempo:  0.015558481216430664
Criação do df tempo:  0.014931440353393555
Drop tempo:  0.004929065704345703
Join tempo:  0.01680278778076172
Cruzamento tempo:  2.6151578426361084
With tempo:  2.461681604385376
Acima tempo:  0.2825291156768799
Window tempo:  0.05849099159240723
Processa carros tempo:  5.440013885498047
Aplica multa tempo:  2.3897907733917236
Perigosa tempo:  17.91664147377014


In [7]:
df_old.toPandas()

Unnamed: 0,placa,rodovia,faixa,aceleracao,posicao_prevista,acima_vel,tempo_em_curso,tempo_inicio,troca_de_faixa,acima_acel,Risco_Colisão,posicao,tempo_da_simulacao,velocidade,aplicaMulta
0,CHI0G12,BR-116,385.0,,,0,0.0,161.0,,0,0,179.9,161.0,,False
1,SUR8W90,BR-116,385.0,,,1,1.0,160.0,True,0,0,256.45,161.0,4.381667,True
2,URU2Y34,BR-116,655.0,-1.021778,20.629111,1,2.0,159.0,False,0,0,36.5,161.0,-15.36,False
3,VEN6C78,BR-116,745.0,0.023389,784.111694,0,2.0,159.0,False,0,0,784.5,161.0,-0.4,False
4,PAR5Z67,BR-116,835.0,,,0,0.0,161.0,,0,0,710.5,161.0,,False


In [8]:
df_perigosa.toPandas()

Unnamed: 0,placa,perigosa_numero,tempo_da_simulacao,tipo
0,SUR8W90,2,161.0,acima_vel
1,URU2Y34,2,161.0,acima_vel
2,SUR8W90,2,161.0,troca_de_faixa
3,COL1R23,2,160.0,acima_vel
4,URU2Y34,2,160.0,acima_vel
5,COL1R23,2,160.0,troca_de_faixa


In [9]:
df_multas.show()

+-------+------------+------------------+
|  placa|multa_numero|tempo_da_simulacao|
+-------+------------+------------------+
|SUR8W90|           1|             161.0|
|COL1R23|           1|             160.0|
|URU2Y34|           1|             160.0|
+-------+------------+------------------+

