In [None]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("SparkTurboDelta") \
    .master("local[*]") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


25/04/19 16:32:33 WARN Utils: Your hostname, MacBook-Air-de-Igoh.local resolves to a loopback address: 127.0.0.1; using 172.17.1.180 instead (on interface en0)
25/04/19 16:32:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/igoh/.ivy2/cache
The jars for the packages stored in: /Users/igoh/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d235570-479c-4213-8048-b17fb871e664;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 95ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.0 from central in [default]
	io.delta#delta-storage;3.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |

In [2]:
from concurrent.futures import ThreadPoolExecutor
import requests
import pandas as pd
import zipfile
import io
from io import StringIO
import os

In [3]:
dic_tabelas = {
    'tabela_ncm': {
        'nome': 'ncm',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM.csv'
    },
    'tabela_sh': {
        'nome': 'sh',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_SH.csv'
    },
    'tabela_cuci': {
        'nome': 'ncm_cuci',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_CUCI.csv'
    },
    'tabela_isic': {
        'nome': 'ncm_isic',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_ISIC.csv'
    },
    'tabela_isic_cuci': {
        'nome': 'isic_cuci',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/ISIC_CUCI.csv' #apagar
    },
    'tabela_cgce': {
        'nome': 'ncm_cgce',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_CGCE.csv'
    },
    'tabela_fator_agregado': {
        'nome': 'ncm_fat_agreg',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_FAT_AGREG.csv'
    },
    'tabela_ppe': {
        'nome': 'ncm_ppe',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_PPE.csv'
    },
    'tabela_ppi': {
        'nome': 'ncm_ppi',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_PPI.csv'
    },
    'tabela_unidade': {
        'nome': 'ncm_unidade',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NCM_UNIDADE.csv'
    },
    'tabela_nbm_ncm': {
        'nome': 'nbm_ncm',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NBM_NCM.csv'
    },
    'tabela_nbm': {
        'nome': 'nbm',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/NBM.csv'
    },
    'tabela_uf': {
        'nome': 'estados',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/UF.csv'
    },
    'tabela_via': {
        'nome': 'via',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/VIA.csv'
    },
    'tabela_urf': {
        'nome': 'urf',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/URF.csv'
    },
    'tabela_pais': {
        'nome': 'paises',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/PAIS.csv'
    },
    'tabela_blocos': {
        'nome': 'blocos',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/PAIS_BLOCO.csv'
    },
    'tabela_municipios': {
        'nome': 'municipios',
        'url': 'https://balanca.economia.gov.br/balanca/bd/tabelas/UF_MUN.csv'
    }
}


In [4]:
def import_full_export_import(url, caminho_delta, nome_tabela):
    try:
        # Requisição para obter o arquivo ZIP
        response = requests.get(url)
        response.raise_for_status()

        # Abrir o conteúdo como ZIP
        with zipfile.ZipFile(io.BytesIO(response.content)) as z:
            print("Arquivos no zip:", z.namelist())

            # Abrir o primeiro arquivo dentro do ZIP
            with z.open(z.namelist()[0]) as csv_file:
                # Caminhos temporários para o DBFS e local
                local_temp_path = "/tmp/temp_file.csv"  # Caminho local na máquina do driver
                dbfs_temp_path = "dbfs:/tmp/temp_file.csv"

                # Escreve temporariamente no driver local
                with open(local_temp_path, 'wb') as f:
                    f.write(csv_file.read())

                # Se for no Databricks, mova o arquivo para o DBFS
                if 'dbutils' in globals():  # Verifica se está no Databricks
                    try:
                        dbutils.fs.cp(f"file:{local_temp_path}", dbfs_temp_path)
                    except Exception as e:
                        print(f"Erro ao mover o arquivo para DBFS: {e}")
                else:
                    print("Ambiente local detectado, mantendo o arquivo local.")

                # Ler com Spark diretamente do DBFS ou local dependendo do ambiente
                path_to_read = dbfs_temp_path if 'dbutils' in globals() else local_temp_path
                df_spark = spark.read.csv(path_to_read, header=True, sep=';', inferSchema=True, encoding='latin1')

                # Salva como tabela Delta
                df_spark.write.format("delta") \
                    .option("overwriteSchema", True) \
                    .mode("overwrite") \
                    .saveAsTable(f"{caminho_delta}.{nome_tabela}")

                print(f"Tabela {nome_tabela} salva com sucesso em: {caminho_delta}.{nome_tabela}")

                # Remove os arquivos temporários
                os.remove(local_temp_path)
                if 'dbutils' in globals():
                    try:
                        dbutils.fs.rm(dbfs_temp_path)
                    except Exception as e:
                        print(f"Erro ao remover o arquivo do DBFS: {e}")

    except requests.exceptions.RequestException as e:
        print(f"Erro ao acessar a URL: {e}")
    except zipfile.BadZipFile as e:
        print(f"Erro ao abrir o arquivo ZIP: {e}")
    except Exception as e:
        print(f"Erro ao processar o arquivo CSV: {e}")


In [5]:
def baixar_csv(info):
    url = info['url']
    nome = info['nome']
    
    response = requests.get(url)
    response.raise_for_status()
    
    df = pd.read_csv(StringIO(response.text), sep=';')
    return nome, df

def importar_table_assistants(dic_tabelas, nome_banco):

    with ThreadPoolExecutor() as executor:
        resultados = executor.map(baixar_csv, dic_tabelas.values())

        for nome, df in resultados:
            df_spark = spark.createDataFrame(df)
            tabela = f"{nome_banco}.{nome}"
            df_spark.write.format("delta").option('overwriteSchema', True).mode("overwrite").saveAsTable(tabela)
            print(f"{nome} salva como tabela: {tabela}")

In [6]:

spark.sql("CREATE DATABASE IF NOT EXISTS bd_becomex LOCATION 'storage'")

DataFrame[]

In [7]:
importar_table_assistants(dic_tabelas, "bd_becomex")

25/04/19 16:32:42 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

ncm salva como tabela: bd_becomex.ncm


25/04/19 16:32:45 WARN TaskSetManager: Stage 4 contains a task of very large size (1148 KiB). The maximum recommended task size is 1000 KiB.


CodeCache: size=131072Kb used=43644Kb max_used=44203Kb free=87427Kb
 bounds [0x00000001071f8000, 0x0000000109d88000, 0x000000010f1f8000]
 total_blobs=16304 nmethods=15158 adapters=1056
 compilation: disabled (not enough contiguous free space left)
sh salva como tabela: bd_becomex.sh
ncm_cuci salva como tabela: bd_becomex.ncm_cuci
ncm_isic salva como tabela: bd_becomex.ncm_isic
isic_cuci salva como tabela: bd_becomex.isic_cuci
ncm_cgce salva como tabela: bd_becomex.ncm_cgce
ncm_fat_agreg salva como tabela: bd_becomex.ncm_fat_agreg
ncm_ppe salva como tabela: bd_becomex.ncm_ppe


                                                                                

ncm_ppi salva como tabela: bd_becomex.ncm_ppi


                                                                                

ncm_unidade salva como tabela: bd_becomex.ncm_unidade


                                                                                

nbm_ncm salva como tabela: bd_becomex.nbm_ncm


                                                                                

nbm salva como tabela: bd_becomex.nbm


                                                                                

estados salva como tabela: bd_becomex.estados


                                                                                

via salva como tabela: bd_becomex.via


                                                                                

urf salva como tabela: bd_becomex.urf


                                                                                

paises salva como tabela: bd_becomex.paises


                                                                                

blocos salva como tabela: bd_becomex.blocos


                                                                                

municipios salva como tabela: bd_becomex.municipios


In [8]:
#importar_table_assistants(dic_tabelas, "bd_becomex")
import_full_export_import('https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/IMP_COMPLETA.zip', 'bd_becomex', 'import') # importação
import_full_export_import('https://balanca.economia.gov.br/balanca/bd/comexstat-bd/ncm/EXP_COMPLETA.zip', 'bd_becomex', 'export') # exportação

Arquivos no zip: ['IMP_COMPLETA.csv']
Ambiente local detectado, mantendo o arquivo local.


                                                                                

Tabela import salva com sucesso em: bd_becomex.import
Arquivos no zip: ['EXP_COMPLETA.csv']
Ambiente local detectado, mantendo o arquivo local.




Tabela export salva com sucesso em: bd_becomex.export


                                                                                

In [None]:
SELECT fi.*, nj.* -- ou selecione colunas específicas, exceto CO_UNID
FROM bd_becomex.import fi
INNER JOIN ncm_unidade nu ON fi.CO_UNID = nu.CO_UNID
INNER JOIN urf u ON fi.CO_URF = u.CO_URF
INNER JOIN paises p ON fi.CO_PAIS = p.CO_PAIS
INNER JOIN via v ON fi.CO_VIA = v.CO_VIA
INNER JOIN (
    SELECT n.*
    FROM ncm n
    INNER JOIN sh s ON n.CO_SH6 = s.CO_SH6
    INNER JOIN ncm_ppe np ON n.CO_PPE = np.CO_PPE
    INNER JOIN ncm_ppi npi ON n.CO_PPI = npi.CO_PPI
    INNER JOIN ncm_fat_agreg nfa ON n.CO_FAT_AGREG = nfa.CO_FAT_AGREG
    INNER JOIN ncm_cuci nc ON n.CO_CUCI_ITEM = nc.CO_CUCI_ITEM
    INNER JOIN ncm_cgce ncg ON n.CO_CGCE_N3 = ncg.CO_CGCE_N3
    INNER JOIN ncm_isic ni ON n.CO_ISIC_CLASSE = ni.CO_ISIC_CLASSE
) nj ON fi.CO_NCM = nj.CO_NCM

In [9]:
spark.sql("""

SELECT *
FROM bd_becomex.import i
JOIN bd_becomex.ncm_unidade nu ON i.CO_UNID = nu.CO_UNID
JOIN bd_becomex.urf u ON i.CO_URF = u.CO_URF
JOIN bd_becomex.paises p ON i.CO_PAIS = p.CO_PAIS
JOIN bd_becomex.via v ON i.CO_VIA = v.CO_VIA
JOIN bd_becomex.ncm n ON i.CO_NCM = n.CO_NCM
JOIN bd_becomex.sh sh ON n.CO_SH6 = sh.CO_SH6
JOIN bd_becomex.ncm_ppe ppe ON n.CO_PPE = ppe.CO_PPE
JOIN bd_becomex.ncm_ppi ppi ON n.CO_PPI = ppi.CO_PPI
JOIN bd_becomex.ncm_fat_agreg fa ON n.CO_FAT_AGREG = fa.CO_FAT_AGREG
JOIN bd_becomex.ncm_cuci cuci ON n.CO_CUCI_ITEM = cuci.CO_CUCI_ITEM
JOIN bd_becomex.ncm_cgce cgce ON n.CO_CGCE_N3 = cgce.CO_CGCE_N3
JOIN bd_becomex.ncm_isic isic ON n.CO_ISIC_CLASSE = isic.CO_ISIC_CLASSE;



""").limit(10).toPandas()

                                                                                

Unnamed: 0,CO_ANO,CO_MES,CO_NCM,CO_UNID,CO_PAIS,SG_UF_NCM,CO_VIA,CO_URF,QT_ESTAT,KG_LIQUIDO,...,NO_ISIC_GRUPO_ING,NO_ISIC_GRUPO_ESP,CO_ISIC_DIVISAO,NO_ISIC_DIVISAO,NO_ISIC_DIVISAO_ING,NO_ISIC_DIVISAO_ESP,CO_ISIC_SECAO,NO_ISIC_SECAO,NO_ISIC_SECAO_ING,NO_ISIC_SECAO_ESP
0,2020,7,85114000,11,249,AM,1,227600,378,1208,...,Manufacture of parts and accessories for motor...,Fabricação de peças e acessórios para veículos...,29,"Fabricação de veículos automóveis, reboques e ...","Manufacture of motor vehicles, trailers and se...","Fabricação de veículos automóveis, reboques e ...",C,Indústria de Transformação,Manufacturing,Indústria de Transformação
1,2020,3,73102190,10,275,SP,1,817800,10758,10758,...,Manufacture of other fabricated metal products...,Fabricação de outros produtos metálicos fabric...,25,"Fabricação de produtos metálicos fabricados, e...","Manufacture of fabricated metal products, exce...","Fabricação de produtos metálicos fabricados, e...",C,Indústria de Transformação,Manufacturing,Indústria de Transformação
2,2020,10,68042219,10,249,RJ,4,717700,1,1,...,Manufacture of non-metallic mineral products n...,Fabricação de produtos minerais não metálicos n.c,23,Fabricação de outros produtos minerais não met...,Manufacture of other non-metallic mineral prod...,Fabricação de outros produtos minerais não met...,C,Indústria de Transformação,Manufacturing,Indústria de Transformação
3,2020,3,85413019,11,23,AL,4,817600,48,12,...,Manufacture of electronic components and boards,Fabricação de componentes eletrônicos e placas,26,"Fabricação de produtos informáticos, eletrônic...","Manufacture of computer, electronic and optica...","Fabricação de produtos informáticos, eletrônic...",C,Indústria de Transformação,Manufacturing,Indústria de Transformação
4,2020,8,84483290,10,23,MT,4,817600,1,1,...,Manufacture of special-purpose machinery,Fabricação de máquinas para fins especiais,28,Fabricação de máquinas e equipamentos n.c,Manufacture of machinery and equipment n.e.c.,Fabricação de máquinas e equipamentos n.c,C,Indústria de Transformação,Manufacturing,Indústria de Transformação
5,2020,2,84716053,11,351,RO,1,717800,5000,654,...,Manufacture of computers and peripheral equipment,Fabricação de computadores e equipamentos peri...,26,"Fabricação de produtos informáticos, eletrônic...","Manufacture of computer, electronic and optica...","Fabricação de produtos informáticos, eletrônic...",C,Indústria de Transformação,Manufacturing,Indústria de Transformação
6,2020,1,59019000,10,160,RO,1,917800,51726,51726,...,Manufacture of other textiles,Fabricação de outros têxteis,13,Fabricação de têxteis,Manufacture of textiles,Fabricação de têxteis,C,Indústria de Transformação,Manufacturing,Indústria de Transformação
7,2020,8,84213100,11,72,SP,4,817700,1,1,...,Manufacture of general-purpose machinery,Fabricação de máquinas de uso geral,28,Fabricação de máquinas e equipamentos n.c,Manufacture of machinery and equipment n.e.c.,Fabricação de máquinas e equipamentos n.c,C,Indústria de Transformação,Manufacturing,Indústria de Transformação
8,2020,2,85437099,11,493,PR,1,817800,576,24,...,Manufacture of other electrical equipment,Fabricação de outro equipamento elétrico,27,Fabricação de equipamentos elétricos,Manufacture of electrical equipment,Fabricação de equipamentos elétricos,C,Indústria de Transformação,Manufacturing,Indústria de Transformação
9,2020,12,85322390,11,351,AM,4,227700,421510,89,...,Manufacture of electronic components and boards,Fabricação de componentes eletrônicos e placas,26,"Fabricação de produtos informáticos, eletrônic...","Manufacture of computer, electronic and optica...","Fabricação de produtos informáticos, eletrônic...",C,Indústria de Transformação,Manufacturing,Indústria de Transformação


In [10]:
#ncm_joined = (
#    ncm
#    .join(sh, on='CO_SH6', how='inner')
#    .join(ncm_ppe, on='CO_PPE', how='inner')
#    .join(ncm_ppi, on='CO_PPI', how='inner')
#    .join(ncm_fat_agreg, on='CO_FAT_AGREG', how='inner')
#    .join(ncm_cuci, on='CO_CUCI_ITEM', how='inner')
#    .join(ncm_cgce, on='CO_CGCE_N3', how='inner')
#    .join(ncm_isic, on='CO_ISIC_CLASSE', how='inner')
#)
#
#fat_import = spark.read.format("delta").table("bd_becomex.import")
#
#fat_import_joined = (
#    fat_import
#    .join(ncm_unidade, on='CO_UNID', how='inner')
#    .join(urf, on='CO_URF', how='inner')
#    .join(paises, on='CO_PAIS', how='inner')
#    .join(via, on='CO_VIA', how='inner')
#)
#
#fat_export = spark.read.format("delta").table("bd_becomex.export")
#
#fat_export_joined = (
#    fat_export
#    .join(ncm_unidade, on='CO_UNID', how='inner')
#    .join(urf, on='CO_URF', how='inner')
#    .join(paises, on='CO_PAIS', how='inner')
#    .join(via, on='CO_VIA', how='inner')
#)
#
#total_import = (
#    fat_import_joined
#    .merge(ncm_joined, left_on='CO_NCM', right_on='CO_NCM', how='inner')
#)
#total_import = total_import.drop("CO_UNID")

In [11]:
#fat_import = spark.read.format("delta").table("bd_becomex.import")
#
#fat_import_joined = (
#    fat_import
#    .join(ncm_unidade, on='CO_UNID', how='inner')
#    .join(urf, on='CO_URF', how='inner')
#    .join(paises, on='CO_PAIS', how='inner')
#    .join(via, on='CO_VIA', how='inner')
#)
#

In [12]:
#fat_export = spark.read.format("delta").table("bd_becomex.export")
#
#fat_export_joined = (
#    fat_export
#    .join(ncm_unidade, on='CO_UNID', how='inner')
#    .join(urf, on='CO_URF', how='inner')
#    .join(paises, on='CO_PAIS', how='inner')
#    .join(via, on='CO_VIA', how='inner')
#)


In [13]:
#total_import = (
#    fat_import_joined
#    .merge(ncm_joined, left_on='CO_NCM', right_on='CO_NCM', how='inner')
#)
#total_import = total_import.drop("CO_UNID")

In [14]:
#total_export = (
#    fat_export_joined
#    .merge(ncm_joined, left_on='CO_NCM', right_on='CO_NCM', how='inner')
#)
#total_export = total_export.drop("CO_UNID")

In [15]:
#%sql
#OPTIMIZE bd_becomex.Baseimport;
#OPTIMIZE bd_becomex.Baseexport;

In [16]:
spark.sql("""


select count(*) from bd_becomex.blocos


""").limit(10).toPandas()

Unnamed: 0,count(1)
0,322
