In [1]:
import pandas as pd
import glob
import http.client
import json

# Reading

In [2]:
#
def read_partials(key, **kargs):
    all_files = glob.glob(key)
    li = []
    for filename in all_files:
        df = pd.read_csv(filename, **kargs)
        li.append(df)

    return pd.concat(li, axis=0, ignore_index=True)

### Reading tablas

In [3]:
tablas = read_partials("tablas*.csv", delimiter=";")
tablas.head()

Unnamed: 0,environment,database,path,last_modified,year,month,day
0,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1
1,production,tb_raw_mydatabase_02,s3:/tb_raw_mydatabase_02,20200101120000,2021,1,1
2,production,tb_raw_mydatabase_03,s3:/tb_raw_mydatabase_03,20200101120000,2021,1,1
3,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200201120000,2021,2,1
4,production,tb_raw_mydatabase_02,s3:/tb_raw_mydatabase_02,20200201120000,2021,2,1


### Reading colunas

In [4]:
colunas = read_partials("colunas*.csv", delimiter=";")
colunas.head()

Unnamed: 0,environment,database,column,type,comment,year,month,day
0,production,tb_raw_mydatabase_01,coluna1,text,Exemplo de coluna,2021,1,1
1,production,tb_raw_mydatabase_01,coluna2,text,Exemplo de coluna,2021,1,1
2,production,tb_raw_mydatabase_01,coluna3,text,Exemplo de coluna,2021,1,1
3,production,tb_raw_mydatabase_01,coluna4,text,Exemplo de coluna,2021,1,1
4,production,tb_raw_mydatabase_01,coluna5,text,Exemplo de coluna,2021,1,1


# Merging

In [5]:
tablas_cols = tablas.merge(colunas, left_on=['database', 'year', 'month', 'day'], right_on=['database', 'year', 'month', 'day'], suffixes=('_left', '_right'))
tablas_cols.head()

Unnamed: 0,environment_left,database,path,last_modified,year,month,day,environment_right,column,type,comment
0,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna1,text,Exemplo de coluna
1,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna2,text,Exemplo de coluna
2,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna3,text,Exemplo de coluna
3,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna4,text,Exemplo de coluna
4,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna5,text,Exemplo de coluna


### Lendo apenas as bases de interesse

In [13]:
tablas_inspect = ['tb_raw_mydatabase_01', 'tb_raw_mydatabase_02', 'tb_raw_mydatabase_03']
tablas_inspect    = tablas_cols.loc[tablas_cols.database.isin(tablas_inspect)]
tablas_inspect.head()

Unnamed: 0,environment_left,database,path,last_modified,year,month,day,environment_right,column,type,comment
0,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna1,text,Exemplo de coluna
1,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna2,text,Exemplo de coluna
2,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna3,text,Exemplo de coluna
3,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna4,text,Exemplo de coluna
4,production,tb_raw_mydatabase_01,s3:/tb_raw_mydatabase_01,20200101120000,2021,1,1,production,coluna5,text,Exemplo de coluna


In [16]:
df_tablas = tablas_inspect

# Utils

In [17]:
AUTHORIZATION="0090ee76-cc91-4956-ab94-616b0ddaa1c9"
SERVER       ="18.219.144.92"

conn = http.client.HTTPConnection(SERVER)

In [18]:
def send_request(rest_type, endpoint, payload, headers, count=3):
    try:
        conn.request(rest_type, endpoint, payload, headers)
        res = conn.getresponse()
        data = res.read().decode("utf-8")

        return json.loads(data)
    except:
        print("reconectando")
        count = count -1
        return send_request(rest_type, endpoint, payload, headers, count)

In [19]:
#
# Retorna o pacote
#
def get_package(package_id: str, **kargs):
    payload = json.dumps({
      "id": package_id
    })

    headers = {
        'Authorization': AUTHORIZATION,
        'Content-Type': 'application/json'
    }
    
    return send_request("POST", "/api/3/action/package_show", payload, headers, **kargs)

In [20]:
#
# Cria um novo pacote
#
def set_package(data, **kargs):
    payload = json.dumps(data)

    headers = {
        'Authorization': AUTHORIZATION,
        'Content-Type': 'application/json'
    }
    
    return send_request("POST", "/api/3/action/package_create", payload, headers, **kargs)

In [21]:
#
# Cria um novo datastore
#
def set_datastore(data, **kargs):
    payload = json.dumps(data)

    headers = {
        'Authorization': AUTHORIZATION,
        'Content-Type': 'application/json'
    }

    return send_request("POST", "/api/3/action/datastore_create", payload, headers, **kargs)

### Lendo os pacotes ja carregados no CKAN

In [32]:
databases      = df_tablas.database.unique().tolist()
databases

['tb_raw_mydatabase_01', 'tb_raw_mydatabase_02', 'tb_raw_mydatabase_03']

In [49]:
# Lendo as bases ja existentes do catalogo

databases      = df_tablas.database.unique().tolist()
databases_aux  = df_tablas.database.unique().tolist()
ckan_databases = {}

for database in databases_aux:
    response = get_package(database)
    if "success" in response and response["success"] == True:
        print(f"Tabela {database} encontrada")
        ckan_databases[database] = response["result"]
        databases.remove(database)

reconectando
Tabela tb_raw_mydatabase_01 encontrada
Tabela tb_raw_mydatabase_02 encontrada


# Bases novas

### Isolando as bases que ainda nao foram carregadas

In [50]:
# Para as bases que ainda nao estao inclusas no catalogo
tablas_nuevas = df_tablas.loc[~df_tablas.database.isin(ckan_databases.keys())]

### Criando o objeto de pacotes

In [52]:
# Der as tabelas disponiveis
my_tablas = {}
for tabla in databases:
    my_tablas[tabla] = {}
    
    db = df_tablas.loc[(df_tablas['database'] == tabla)]\
                    .sort_values(by=['year', 'month', 'day'], ascending=False)\
                    .reset_index(drop=True)
    
    data = json.loads(db.to_json())

    for key in data.keys():
        my_tablas[tabla][key] = data[key]['0']

In [53]:
print(my_tablas)

{'tb_raw_mydatabase_03': {'environment_left': 'production', 'database': 'tb_raw_mydatabase_03', 'path': 's3:/tb_raw_mydatabase_03', 'last_modified': 20200601120000, 'year': 2021, 'month': 6, 'day': 1, 'environment_right': 'production', 'column': 'coluna1', 'type': 'text', 'comment': 'Exemplo de coluna'}}


### Carregando as colunas dentro do objeto de pacotes

In [56]:
def create_new_package(tabla):
    package = {
        "name": tabla['database'],
        "title": tabla['database'],
        "owner_org": "bank",
        "notes": "Base carregada por processo automatico",
        "author": "dorotheu",
        "author_email": "dorotheu@teste.com",
        "private": True,
        "extras": [
            {
                "key": "badge",
                "value": "gold"
            },
            {
                "key": "last_modified",
                "value": tabla['last_modified']
            },
            {
                "key": "path",
                "value": tabla['path']
            }
        ]
    }
    print(package)

    return set_package(package)

In [57]:
def create_new_datastore(tabla):
    datastore = {
        "resource": {
            "package_id": tabla['database'],
            "name": tabla['database'],
            "description": "Dicionario de Dados",
            "format": "Redshift"
        },
        "fields": tabla['columns'],
    }
    
    return set_datastore(datastore)

In [54]:
# Lendo as colunas das tabelas disponiveis
for tabla_name in my_tablas:
    db = df_tablas.loc[(df_tablas['database'] == tabla_name)]\
                    .loc[(df_tablas['year']  == my_tablas[tabla_name]['year'])]\
                    .loc[(df_tablas['month'] == my_tablas[tabla_name]['month'])]\
                    .loc[(df_tablas['day']   == my_tablas[tabla_name]['day'])]\
                    .reset_index(drop=True)
    
    my_tablas[tabla_name]['columns'] = []

    data = json.loads(db.to_json())
    size = len(db.index)
    
    #for row in size:
    for row in range(0, size):
        my_tablas[tabla_name]['columns'].append({
            "id": data['column'][f"{row}"],
            "type": data['type'][f"{row}"],
            "info": {
                "label": data['column'][f"{row}"],
                "notes": data['comment'][f"{row}"]
            }
        })
    

In [55]:
print(my_tablas)

{'tb_raw_mydatabase_03': {'environment_left': 'production', 'database': 'tb_raw_mydatabase_03', 'path': 's3:/tb_raw_mydatabase_03', 'last_modified': 20200601120000, 'year': 2021, 'month': 6, 'day': 1, 'environment_right': 'production', 'column': 'coluna1', 'type': 'text', 'comment': 'Exemplo de coluna', 'columns': [{'id': 'coluna1', 'type': 'text', 'info': {'label': 'coluna1', 'notes': 'Exemplo de coluna'}}, {'id': 'coluna2', 'type': 'text', 'info': {'label': 'coluna2', 'notes': 'Exemplo de coluna'}}, {'id': 'coluna3', 'type': 'text', 'info': {'label': 'coluna3', 'notes': 'Exemplo de coluna'}}, {'id': 'coluna4', 'type': 'text', 'info': {'label': 'coluna4', 'notes': 'Exemplo de coluna'}}, {'id': 'coluna5', 'type': 'text', 'info': {'label': 'coluna5', 'notes': 'Exemplo de coluna'}}]}}


In [58]:
for tabla_name in my_tablas:
    response_package   = create_new_package(my_tablas[tabla_name])
    response_datastore = create_new_datastore(my_tablas[tabla_name])

{'name': 'tb_raw_mydatabase_03', 'title': 'tb_raw_mydatabase_03', 'owner_org': 'bank', 'notes': 'Base carregada por processo automatico', 'author': 'dorotheu', 'author_email': 'dorotheu@teste.com', 'private': True, 'extras': [{'key': 'badge', 'value': 'gold'}, {'key': 'last_modified', 'value': 20200601120000}, {'key': 'path', 'value': 's3:/tb_raw_mydatabase_03'}]}
reconectando
