In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import to_date, when
from pyspark.sql.functions import regexp_replace, col
import time
import pandas as pd



In [None]:
if 'spark3' in locals():
    spark3.stop()

In [None]:
# Iniciar uma nova sessão Spark
spark2 = SparkSession.builder.appName("CNPJ").getOrCreate()

In [None]:
# Encerre a sessão existente, se houver
if 'spark' in locals():
    spark.stop()

In [None]:
# Define o caminho base para os dados
caminho = "gs://dataproc-staging-us-central1-574457499229-8awbbdyx/notebooks/jupyter/DADOS/DadosProcessados/"

# Função para carregar e cachear as tabelas
def carregar_tabela(nome_tabela):
    arquivo = caminho + f"{nome_tabela}.csv"
    df = spark2.read.csv(arquivo, header=True, inferSchema=True, sep=";")
    df.createOrReplaceTempView(nome_tabela)
    spark2.sql(f"CACHE TABLE {nome_tabela}")

In [None]:
# Carregar e cachear as tabelas
tabelas = ["EMPRESAS", "ESTABELECIMENTOS", "CNAES", "SOCIOS", "SIMPLES"]
for tabela in tabelas:
    carregar_tabela(tabela)

In [None]:
# Verificar se as tabelas estão em cache
for tabela in tabelas:
    cache_status = spark2.catalog.isCached(tabela)
    print(f"Tabela {tabela} está em cache: {cache_status}")

In [None]:
# Exemplo de consultas SQL em todas as tabelas
result_empresas = spark2.sql("SELECT COUNT(*) FROM EMPRESAS").collect()
result_estabelecimentos = spark2.sql("SELECT COUNT(*) FROM ESTABELECIMENTOS").collect()
result_cnaes = spark2.sql("SELECT COUNT(*) FROM CNAES").collect()
result_socios = spark2.sql("SELECT COUNT(*) FROM SOCIOS").collect()
result_simples = spark2.sql("SELECT COUNT(*) FROM SIMPLES").collect()

# Imprimir todos os resultados juntos
print("Count Results:")
print(f"Empresas: {result_empresas[0][0] if result_empresas else 'N/A'}")
print(f"Estabelecimentos: {result_estabelecimentos[0][0] if result_estabelecimentos else 'N/A'}")
print(f"CNAEs: {result_cnaes[0][0] if result_cnaes else 'N/A'}")
print(f"Socios: {result_socios[0][0] if result_socios else 'N/A'}")
print(f"Simples: {result_simples[0][0] if result_simples else 'N/A'}")

Count Results:
Empresas: 57963081
Estabelecimentos: 60944825
CNAEs: 1359
Socios: 24251230
Simples: 39373015

In [None]:
spark2.sql("SELECT COUNT(*) FROM EMPRESAS e JOIN SOCIOS s ON e.cnpjbasico = s.cnpjbasico JOIN ESTABELECIMENTOS est ON e.cnpjbasico = est.cnpjbasico WHERE e.capitalsocial BETWEEN 20000000 AND 50000000 AND UPPER(s.qualificacaosocio) = 'SÓCIO-GERENTE' AND UPPER(s.pais) != 'BRASIL' GROUP BY e.razaosocial HAVING COUNT(s.cnpjbasico) < 3 AND COUNT(DISTINCT est.municipio) >= 2;").show()

In [None]:
import re
# Ler o arquivo CSV
df_queries = pd.read_csv('gs://dataproc-staging-us-central1-574457499229-8awbbdyx/notebooks/jupyter/cnpjnl2sql/queries/queries.csv')

# Função para ajustar a query
def adjust_query(query):
    adjusted_query = re.sub(r'\"public\"\.\"(\w+)\"', r'\1', query)
    adjusted_query = re.sub(r'public\.\"(\w+)\"', r'\1', adjusted_query)
    return adjusted_query

# Criar a nova coluna spark_query
df_queries['spark_query'] = df_queries['postgres_query'].apply(adjust_query)

In [None]:
df_queries

In [None]:
# Inicializar as listas para armazenar os resultados e os tempos de execução do Spark
results = []
execution_times = []
ind=0
# Realizar as consultas na sessão spark2
for index, row in df_queries.iterrows():
    try:
        start_time = time.time()
        query = row['spark_query']
        result = spark2.sql(query).collect()
        end_time = time.time()
        
        execution_times.append(end_time - start_time)
        
        # Armazenar apenas o número dos resultados
        if result:
            results.append(result[0][0] if result else None)
            ind += 1
            print(str(ind) + "-")
            print(query+"-")
            print(result[0][0] if result else None)
        else:
            results.append(None)
    except Exception as e:
        print(f"Error executing query for row {index}: {e}")
        results.append(None)
        execution_times.append(None)

# Adicionar os resultados e os tempos de execução ao DataFrame
df_queries['spark_query_result'] = results
df_queries['spark_query_exec_time'] = execution_times


In [None]:
# Salvar o DataFrame atualizado de volta para um arquivo CSV
df_queries.to_csv('gs://dataproc-staging-us-central1-574457499229-8awbbdyx/notebooks/jupyter/cnpjnl2sql/queries/queries_results_spark.csv', index=False)



In [None]:
# Adicionar os resultados e os tempos de execução ao DataFrame
df_queries['spark_query_result'] = results
df_queries['spark_query_exec_time'] = execution_times

In [None]:
df_queries

In [None]:
# Exibir o DataFrame atualizado para verificar os resultados
pd.set_option('display.max_rows', None)
df_queries