In [1]:
from ckanapi import RemoteCKAN
import ckanapi.errors
from ckanapi.errors import NotFound, ValidationError
import pandas as pd
from basedosdados import read_sql
import requests
import os
import json
from tqdm import tqdm
from google.cloud import storage

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '/Users/guialvesp1/.basedosdados/credentials/staging.json'

In [2]:
class Migrator:
    def __init__(self, ckan_remote: RemoteCKAN, package_dict):
        self.ckan_remote = ckan_remote
        self.package_dict = package_dict

    def create(self):
        try:
            self.ckan_remote.action.package_create(**self.package_dict)
        except NotFound as e:
            print(e)

    def update(self):
        try:
            self.ckan_remote.action.package_update(**self.package_dict)
        except NotFound as e:
            print(e)

    def purge(self):
        try:
            self.ckan_remote.action.dataset_purge(id=self.package_dict["name"])
        except NotFound as e:
            print(e)

    def delete(self):
        try:
            self.ckan_remote.action.package_delete(id=self.package_dict["name"])
        except NotFound as e:
            print(e)

    def validate(self):
        try:
            self.ckan_remote.action.bd_dataset_validate(**self.package_dict)
        except NotFound as e:
            print(e)

In [3]:
def download_packages(ORIGINAL_CKAN_URL, env):
    api_url = ORIGINAL_CKAN_URL + "/api/3/action/package_search?q=&rows=3000"
    packages = requests.get(api_url, verify=False).json()["result"]["results"]
    for p in packages:
        if not os.path.isdir(f"packages/"):
            os.mkdir(f"packages/")
        if not os.path.isdir(f"packages/{env}"):
            os.mkdir(f"packages/{env}")
        name = p["name"]
        json.dump(p, open(f"packages/{env}/{name}", "w"))
    return packages

In [4]:
def get_compressed_blob_size(package):
    storage_client = storage.Client()
    bucket = storage_client.bucket('basedosdados-public')
    for i, resource in enumerate(package["resources"]):
        if resource["resource_type"] == "bdm_table":
            if "compressed_file_size" not in resource or resource["compressed_file_size"] is None or resource["compressed_file_size"] == "":
                dataset_id = resource['dataset_id']
                table_id = resource['table_id']
                try:
                    blob = bucket.get_blob(f'one-click-download/{dataset_id}/{table_id}/{table_id}.csv.gz')
                    resource["compressed_file_size"] = int(blob.size)
                except Exception as e:
                    resource["compressed_file_size"] = ""

    return package

In [5]:
def get_uncompressed_blob_size(package):
    storage_client = storage.Client()
    bucket = storage_client.bucket('basedosdados')
    for i, resource in enumerate(package["resources"]):
        if resource["resource_type"] == "bdm_table":
            if "uncompressed_file_size" not in resource or resource["uncompressed_file_size"] is None or resource["uncompressed_file_size"] == "":
                dataset_id = resource['dataset_id']
                table_id = resource['table_id']
                try:
                    blob = bucket.get_blob(f'staging/{dataset_id}/{table_id}/{table_id}.csv')
                    resource["uncompressed_file_size"] = int(blob.size)
                except Exception as e:
                    resource["uncompressed_file_size"] = ""

    return package

In [10]:
LOCAL_CKAN_URL = "http://localhost:5000"
DEV_CKAN_URL = "https://staging.basedosdados.org"
PROD_CKAN_URL = "https://basedosdados.org"

local_packages = download_packages(LOCAL_CKAN_URL, "local")
dev_packages = download_packages(DEV_CKAN_URL, "dev")
prod_packages = download_packages(PROD_CKAN_URL, "prod")

In [7]:
api_key_dev = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiI1RUxycHVUV0ZRMXoxbXAwYWE5RjNNalpsZnlBbUpGZ3phUEZIYWxTTXg0eUxReV9UdnJjOHRudFhNeVNneHlLTWJSSmVNZGt4U0NwenU4QSIsImlhdCI6MTYyMTEyMzM5NH0.ePC4i_UpPK-98MCLV-pVjRD2cw1dykPFXeJpxNVUPwY'
api_key_prod = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiIxOEJhU18tMFJVUmhQSy0tR1pkQ2FqZ09nMkNIRUtYZ1FUOThibTY0YlhQcURVTlBucUFnQXNYMDJITHBpblJVaW5lZW9WT0RIdXE2ZTkwaSIsImlhdCI6MTYzNTc3MzIwM30.g0DB16S3X15CECnRo6-hTvUIlfcarGHvUnXRViNKBUc'

# Local packages

In [12]:
update_packages = []
for package in tqdm(local_packages):
        update_packages.append(get_compressed_blob_size(package))

ckan_remote = RemoteCKAN(LOCAL_CKAN_URL, apikey=api_key_dev)

for i, package in tqdm(enumerate(update_packages)):
    try:
        migration = Migrator(ckan_remote, package)
        migration.validate()
        migration.update()
    except Exception as e :
        print(e)
        break

100%|██████████| 100/100 [00:24<00:00,  4.09it/s]
5it [00:01,  3.49it/s]

{'message': [{'loc': ['resources', 0, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 1, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 2, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 3, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 4, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 5, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 6, 'BdmTable', 'compressed_file_size'], 'msg': 'value is not a valid integer', 'type': 'type_error.integer'}, {'loc': ['resources', 7, 'BdmTable', 'compressed_file_size'], '




In [None]:
update_packages = []
for package in tqdm(local_packages):
        update_packages.append(get_uncompressed_blob_size(package))

ckan_remote = RemoteCKAN(LOCAL_CKAN_URL, apikey=api_key_dev)

for i, package in tqdm(enumerate(update_packages)):
    try:
        migration = Migrator(ckan_remote, package)
        migration.validate()
        migration.update()
    except Exception as e :
        print(e)
        break

# DEV packages

In [None]:
update_packages = []
for package in tqdm(dev_packages):
        update_packages.append(get_compressed_blob_size(package))

ckan_remote = RemoteCKAN(DEV_CKAN_URL, apikey=api_key_prod)

for i, package in tqdm(enumerate(update_packages)):
    migration = Migrator(ckan_remote, package)
    migration.validate()
    migration.update()

In [None]:
update_packages = []
for package in tqdm(dev_packages):
        update_packages.append(get_uncompressed_blob_size(package))

ckan_remote = RemoteCKAN(DEV_CKAN_URL, apikey=api_key_prod)

for i, package in tqdm(enumerate(update_packages)):
    migration = Migrator(ckan_remote, package)
    migration.validate()
    migration.update()

# PROD packages

In [None]:
update_packages = []
for package in tqdm(prod_packages):
        update_packages.append(get_compressed_blob_size(package))

ckan_remote = RemoteCKAN(PROD_CKAN_URL, apikey=api_key_prod)

for i, package in tqdm(enumerate(update_packages)):
    migration = Migrator(ckan_remote, package)
    migration.validate()
    migration.update()

In [None]:
update_packages = []
for package in tqdm(prod_packages):
        update_packages.append(get_uncompressed_blob_size(package))

ckan_remote = RemoteCKAN(PROD_CKAN_URL, apikey=api_key_prod)

for i, package in tqdm(enumerate(update_packages)):
    migration = Migrator(ckan_remote, package)
    migration.validate()
    migration.update()