# AD07 - 113M observações japonesas de `customs` (1988 - 2020)

CSV data for Japan's 100 million customs trade statistics since 1988 until 2020

https://www.kaggle.com/datasets/zanjibar/100-million-data-csv?datasetId=684488

- Coluna 0: ym(Year + month),
- Coluna 1: exp_imp(export: 1, import: 2),
- Coluna 2: hs9(HS code),
- Coluna 3: Customs,
- Coluna 4: Country,
- Coluna 5: Q1,
- Coluna 6: Q2(quantity),
- Coluna 7: Value(in thousands of yen)

In [1]:
import numpy as np
import dask.array as da

## Obter os dados

### Opção #1

A primeira opção envolve baixar o arquivo `zip` original a partir do link kaggle acima, obtendo o arquivo `custom_1988_2020.csv` para em seguida separá-lo por ano com o script abaixo.


In [None]:
%%bash
for ano in $(seq 1988 2020); do
   cat ./csv/custom_1988_2020.csv | grep ^$ano > custom_per_year_${ano}.csv
done

## Versão sequencial


### Como o arquivo é muito grande, vamos ler apenas os dados de *1988*.

In [None]:
a = np.loadtxt("custom_per_year_1988.csv",
                dtype = float,
                delimiter = ",",
                usecols = (0, 1, 7))
# Vamos converter a coluna do valor para trilhões de yens
a[:,2] /= 1000000000
a

### Vamos tentar responder as seguintes perguntas.

Temos agora apenas três colunas:
1. ym(Year + month)
2. exp_imp(export: 1, import: 2)
3. Value(in **trillions of yen**)

In [None]:
a

#### Qual é o valor total de valores importados?
Os valores importados estão registrados na coluna 2 quando a coluna 1 tem o valor de 2.

In [None]:
importados = a[:,1] == 2
importados

In [None]:
a[importados][:,2].sum()

#### E de valores exportados?
Os valores exportados estão registrados na coluna 2 quando a coluna 1 tem o valor de 1.

In [None]:
exportados = a[:,1] == 1
exportados

In [None]:
a[exportados][:,2].sum()

#### A balança foi positiva ou negativa no período?

In [None]:
a[exportados][:,2].sum() - a[importados][:,2].sum()

## Exercício 03 - Parte 1

Instanciar o cluster e apontar nosso cliente para o `dask-scheduler`.

Os vários arquivos `csv` já foram colocados na plataforma computacional.

In [None]:
from dask.distributed import Client, LocalCluster
#client = Client("tcp://gppd-hpc.inf.ufrgs.br:8786")
#client.close()
client = Client()
client

2022-10-24 19:03:19,992 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-op17ez0_', purging
2022-10-24 19:03:19,992 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-enid50s2', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 9.49 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42041,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 9.49 GiB

0,1
Comm: tcp://127.0.0.1:38643,Total threads: 1
Dashboard: http://127.0.0.1:35655/status,Memory: 2.37 GiB
Nanny: tcp://127.0.0.1:39937,
Local directory: /tmp/dask-worker-space/worker-_jkeupt_,Local directory: /tmp/dask-worker-space/worker-_jkeupt_

0,1
Comm: tcp://127.0.0.1:37753,Total threads: 1
Dashboard: http://127.0.0.1:39603/status,Memory: 2.37 GiB
Nanny: tcp://127.0.0.1:44797,
Local directory: /tmp/dask-worker-space/worker-rv7kz638,Local directory: /tmp/dask-worker-space/worker-rv7kz638

0,1
Comm: tcp://127.0.0.1:45155,Total threads: 1
Dashboard: http://127.0.0.1:33575/status,Memory: 2.37 GiB
Nanny: tcp://127.0.0.1:39611,
Local directory: /tmp/dask-worker-space/worker-fe1gjae9,Local directory: /tmp/dask-worker-space/worker-fe1gjae9

0,1
Comm: tcp://127.0.0.1:44521,Total threads: 1
Dashboard: http://127.0.0.1:39823/status,Memory: 2.37 GiB
Nanny: tcp://127.0.0.1:38079,
Local directory: /tmp/dask-worker-space/worker-_sgpgl5j,Local directory: /tmp/dask-worker-space/worker-_sgpgl5j


Uma função de leitura *delayed* capaz de ler um arquivo.

In [None]:
import dask.array as da
from dask import delayed

def le_um_arquivo(filename):
    a = np.loadtxt(filename,
                    dtype = float,
                    delimiter = ",",
                    usecols = (0, 1, 7))
    # Vamos converter a coluna do valor para trilhões de yens
    a[:,2] /= 1000000000
    return a
#

Agora vamos ler os arquivos.

Precisamos chamar `compute_chunk_sizes()` para poder fazer fatiamento nos códigos seguintes.

In [None]:
import glob
import dask

f = []
#file_list = glob.glob("custom_per_year_19[89][89].csv") #altere aqui a expressão regular para ler mais arquivos
file_list = []

for ano in range(1988,1994):   
    file_list.append("custom_per_year_" + str(ano) + ".csv")

file_list



['custom_per_year_1988.csv',
 'custom_per_year_1989.csv',
 'custom_per_year_1990.csv',
 'custom_per_year_1991.csv',
 'custom_per_year_1992.csv',
 'custom_per_year_1993.csv']

In [None]:
for filename in file_list:
    d = dask.delayed(le_um_arquivo)(filename)
    f.append(da.from_delayed(d, (np.nan, np.nan), dtype=float))
a = da.concatenate(f, allow_unknown_chunksizes=True)
a.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,371.01 MiB,65.10 MiB
Shape,"(16209786, 3)","(2844283, 3)"
Count,18 Tasks,6 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 371.01 MiB 65.10 MiB Shape (16209786, 3) (2844283, 3) Count 18 Tasks 6 Chunks Type float64 numpy.ndarray",3  16209786,

Unnamed: 0,Array,Chunk
Bytes,371.01 MiB,65.10 MiB
Shape,"(16209786, 3)","(2844283, 3)"
Count,18 Tasks,6 Chunks
Type,float64,numpy.ndarray


In [None]:
%%time
#Calcula balança comercial usando o padrão do numero de chunks = numero de arquivos = 6 arquivos
importados = a[:,1] == 2
exportados = a[:,1] == 1
r = (a[exportados][:,2].sum() - a[importados][:,2].sum())
r.compute()

CPU times: user 3.99 s, sys: 811 ms, total: 4.8 s
Wall time: 37.1 s


63.69925040200002

Temos acima um chunk por arquivo.

Podemos alterar o tamanho do chunk com `rechunk` (cuidado para não gerar muitos chunks).

In [None]:
#Tentamos encontrar um numero de chunks alto multiplo de 4 (nossa máquina do experimento possui 4 cores)
#a = a.rechunk((4200000, 3)) ##->> 4 chunks (47.5s)
#a = a.rechunk((2100000, 3)) ##->> 8 chunks (41.6s)
#a = a.rechunk((1422142, 3)) ##->> 12 chunks (40.5s)
#a = a.rechunk((1050000, 3)) ##->> 16 chunks (43.8s)
a = a.rechunk((520000, 3)) ##->> 32 chunks (38.7s)
#a = a.rechunk((255000, 3)) ##->> 64 chunks (39.7s)
a

Unnamed: 0,Array,Chunk
Bytes,371.01 MiB,11.90 MiB
Shape,"(16209786, 3)","(520000, 3)"
Count,87 Tasks,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 371.01 MiB 11.90 MiB Shape (16209786, 3) (520000, 3) Count 87 Tasks 32 Chunks Type float64 numpy.ndarray",3  16209786,

Unnamed: 0,Array,Chunk
Bytes,371.01 MiB,11.90 MiB
Shape,"(16209786, 3)","(520000, 3)"
Count,87 Tasks,32 Chunks
Type,float64,numpy.ndarray


#### A balança foi positiva ou negativa no período?

In [None]:
%%time
#Calculando a balança com o novo número de chunks
importados = a[:,1] == 2
exportados = a[:,1] == 1
r = (a[exportados][:,2].sum() - a[importados][:,2].sum())
# r.visualize()
r.compute()

CPU times: user 4.81 s, sys: 1.05 s, total: 5.86 s
Wall time: 40.7 s


63.69925040200002

## Exercício 3 - Parte 2

#### Questionamento: Qual o mês de 1988 com o maior valor de importação?

In [None]:
for mes in range(198801,198813):   
    print ('Mês:', mes)
    importados = (a[:,0] == mes) & (a[:,1] == 2)   
    r_teste = a[importados][:,2].sum()
    valor = r_teste.compute()
    print ('Soma das importações:', valor)


Mês: 198801
Soma das importações: 1.796164853
Mês: 198802
Soma das importações: 1.927670914
Mês: 198803
Soma das importações: 1.9639650330000002
Mês: 198804
Soma das importações: 1.9719962840000003
Mês: 198805
Soma das importações: 1.9289143020000001
Mês: 198806
Soma das importações: 2.0166218209999998
Mês: 198807
Soma das importações: 2.097989785
Mês: 198808
Soma das importações: 2.1134153319999998
Mês: 198809
Soma das importações: 2.0689340530000004
Mês: 198810
Soma das importações: 2.088890851
Mês: 198811
Soma das importações: 2.0204613350000002
Mês: 198812
Soma das importações: 2.011295296


In [None]:
#Resposta: O mês de agosto de 1988 obteve maior valor de importação (2.1134)