diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..1be5a8c1 Binary files /dev/null and b/.DS_Store differ diff --git a/.github/workflows/mage-ai-develop.yml b/.github/workflows/mage-ai-develop.yml new file mode 100644 index 00000000..7eaceada --- /dev/null +++ b/.github/workflows/mage-ai-develop.yml @@ -0,0 +1,41 @@ +name: Push mageai to the latest deploy helm chart to Amazon EKS + +on: + workflow_dispatch: + push: + paths: + - global-api/importer-mage/** + - charts/mage-ai/** + - .github/workflows/mage-ai-develop.yml + branches: ["develop"] + pull_request: + paths: + - global-api/importer-mage/** + +jobs: + + pushToGHCR: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./global-api/importer-mage + + steps: + - uses: actions/checkout@v4 + + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Pushing mageai to GHCR + env: + VERSION: ${{ github.sha }} + IMAGE: ghcr.io/open-earth-foundation/citycatalyst-mage-ai + run: | + docker build -t $IMAGE:$VERSION . + docker tag $IMAGE:$VERSION $IMAGE:latest + docker push $IMAGE:$VERSION + docker push $IMAGE:latest diff --git a/.gitignore b/.gitignore index fee8b2d2..d7c001e1 100644 --- a/.gitignore +++ b/.gitignore @@ -150,5 +150,6 @@ global-api/importer/osm/cache/** global-api/importer/osm/data/** global-api/data/** + #Python virtual env -global-api/env \ No newline at end of file +global-api/env diff --git a/charts/mage-ai/values.yml b/charts/mage-ai/values.yml new file mode 100644 index 00000000..dba56d55 --- /dev/null +++ b/charts/mage-ai/values.yml @@ -0,0 +1,164 @@ +# Default values for mageai. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 +standaloneScheduler: false + +# Effective if standaloneScheduler is true +scheduler: + replicaCount: 1 + name: mageai-schedulers + + +# Effective if standaloneScheduler is true +webServer: + replicaCount: 1 + name: mageai-webserver + +# Enable redis if you want more replica +redis: + enabled: false + architecture: standalone + auth: + enabled: false + # Your custom redis url (make sure redis.enabled is set to false) + customRedisURL: "" + +image: + repository: ghcr.io/open-earth-foundation/citycatalyst-mage-ai + pullPolicy: Always + tag: latest + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "mageai" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +service: + type: ClusterIP + port: 6789 + # Annotations to add to the service + annotations: {} + +# Configure extra options for containers' liveness probes +# If not configured, the probe is enabled using the following values +livenessProbe: + enabled: true + path: /api/status + port: http + initialDelaySeconds: 5 + # failureThreshold: + # periodSeconds: + # successThreshold: + # terminationGracePeriodSeconds: + timeoutSeconds: 10 + +# Configure extra options for containers' readiness probes +# If not configured, the probe is enabled using the following values +readinessProbe: + enabled: true + path: /api/status + port: http + initialDelaySeconds: 5 + # failureThreshold: + # periodSeconds: + # successThreshold: + # terminationGracePeriodSeconds: + timeoutSeconds: 1 + +# Custom liveness probe +customLivenessProbe: {} + +# Custom readiness probe +customReadinessProbe: {} + +# We recommend creating the ingress separately instead of creating it using this chart. +# There is a corresponding Mage-Ingress chart to create an ingress for Mage if needed. +# This section is kept here for backwards compatibility. +ingress: + enabled: false + className: "" + annotations: {} + # kubernetes.io/ingress.class: nginx + # kubernetes.io/tls-acme: "true" + hosts: + - host: chart-example.local + paths: + - path: / + pathType: ImplementationSpecific + tls: [] + # - secretName: chart-example-tls + # hosts: + # - chart-example.local + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +nodeSelector: {} + +tolerations: [] + +affinity: {} + +# extraVolumeMounts: +# - name: mage-fs +# mountPath: /home/src + +extraVolumes: + - name: mage-fs + persistentVolumeClaim: + claimName: mage-fs-pvc + +# config: Default configuration for mageai as environment variables. These get injected directly in the container. +config: {} + +# existingSecret: Specifies an existing secret to be used as environment variables. These get injected directly in the container. +existingSecret: "" + +# secrets: Default secrets for mageai as environment variables. These get injected directly in the container. +# Consider using a secret manager first, before sourcing secrets as environment variables. +secrets: {} + +# extraEnvs: Extra environment variables +extraEnvs: + - name: KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + +env: + - name: REQUIRE_USER_AUTHENTICATION + value: "1" + - name: DEFAULT_OWNER_EMAIL + value: "" + - name: DEFAULT_OWNER_PASSWORD + value: "" + - name: DEFAULT_OWNER_USERNAME + value: "" \ No newline at end of file diff --git a/global-api/import_everything.sh b/global-api/import_everything.sh index 8303e3a5..9ff59df6 100755 --- a/global-api/import_everything.sh +++ b/global-api/import_everything.sh @@ -134,7 +134,8 @@ popd # Import EDGAR extract +# no longer require sample data to be loaded. -pushd importer/edgar/pilot_cities -$python_cmd load_edgar_pilot_city.py --database_uri $DB_URI -popd \ No newline at end of file +# pushd importer/edgar/pilot_cities +# $python_cmd load_edgar_pilot_city.py --database_uri $DB_URI +# popd \ No newline at end of file diff --git a/global-api/importer-mage/.gitignore b/global-api/importer-mage/.gitignore new file mode 100644 index 00000000..0114ce57 --- /dev/null +++ b/global-api/importer-mage/.gitignore @@ -0,0 +1,178 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +.idea +cache/ +/cache/ + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +.DS_Store +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# data files +mage_ai/server/data/files/** +default_repo +instance_metadata.json +~/.mage_data + +# test notebook +test.ipynb + +# test files +testfiles/ + +# Docker specific files +.bash_history +.jupyter/ +.local/ +.npm/ +.python_history +mage_ai/frontend/package-lock.json +docker-compose.override.yml + +# DB files +*.db + +# Terraform files +**/.terraform* +*.tfstate* +*.zip + +# vscode +.vscode/ + +# front-end +node_modules +.npmrc + +# Scratch files +scratch* +/mage_data* +*secrets*.json +your-first-project + +# raw data +raw_data/*/extracted/* \ No newline at end of file diff --git a/global-api/importer-mage/Dockerfile b/global-api/importer-mage/Dockerfile new file mode 100644 index 00000000..83384785 --- /dev/null +++ b/global-api/importer-mage/Dockerfile @@ -0,0 +1,17 @@ +FROM mageai/mageai:latest + +ARG PROJECT_NAME=cc-mage +ARG MAGE_CODE_PATH=/home/src +ARG USER_CODE_PATH=${MAGE_CODE_PATH}/${PROJECT_NAME} + +WORKDIR ${MAGE_CODE_PATH} + +COPY ${PROJECT_NAME} ${PROJECT_NAME} + +ENV USER_CODE_PATH=${USER_CODE_PATH} + +RUN pip3 install -r ${USER_CODE_PATH}/requirements.txt + +ENV PYTHONPATH="${PYTHONPATH}:${MAGE_CODE_PATH}" + +CMD ["/bin/sh", "-c", "/app/run_app.sh"] \ No newline at end of file diff --git a/global-api/importer-mage/README b/global-api/importer-mage/README new file mode 100644 index 00000000..69842cfd --- /dev/null +++ b/global-api/importer-mage/README @@ -0,0 +1,39 @@ +### Mage Importer + +Responsible for loading data into city catalyst database + +### Requirements + +Docker + +### Setup + +#### Database + +You have to create a Postgres database user: + +```bash +createuser ccglobal +``` + +Then create a database: + +```bash +createdb ccglobal -O ccglobal +``` + +#### Configuration + +Copy `dev.env` to `.env` and edit it to match your configuration. + +```bash +cp dev.env .env +``` + +#### Start Mage-ai + +```bash +docker compose up +``` + +Navigate to http://localhost:6789 \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/.gitignore b/global-api/importer-mage/cc-mage/.gitignore new file mode 100755 index 00000000..8b3e82f6 --- /dev/null +++ b/global-api/importer-mage/cc-mage/.gitignore @@ -0,0 +1,14 @@ +.DS_Store +.file_versions +.gitkeep +.log +.logs/ +.mage_temp_profiles +.preferences.yaml +.variables/ +__pycache__/ +docker-compose.override.yml +logs/ +mage-ai.db +mage_data/ +secrets/ diff --git a/global-api/importer-mage/cc-mage/.ssh_tunnel/aws_emr.json b/global-api/importer-mage/cc-mage/.ssh_tunnel/aws_emr.json new file mode 100644 index 00000000..9e26dfee --- /dev/null +++ b/global-api/importer-mage/cc-mage/.ssh_tunnel/aws_emr.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/__init__.py b/global-api/importer-mage/cc-mage/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/charts/__init__.py b/global-api/importer-mage/cc-mage/charts/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/custom/__init__.py b/global-api/importer-mage/cc-mage/custom/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/custom/delta_osm_polygons_generator.py b/global-api/importer-mage/cc-mage/custom/delta_osm_polygons_generator.py new file mode 100644 index 00000000..bda2404b --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/delta_osm_polygons_generator.py @@ -0,0 +1,59 @@ +import osmnx as ox +import pandas as pd +import geopandas as gpd + +from mage_ai.io.postgres import Postgres +from mage_ai.io.config import ConfigFileLoader +from mage_ai.data_preparation.decorators import data_exporter +import pandas as pd +from os import path +from mage_ai.settings.repo import get_repo_path + +def process_record(record): + osmid = record['osmid'] + locode = record['locode'] + try: + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + gdf['geometry'] = gdf['geometry'].apply(lambda geom: geom.wkt) + gdf_dict = gdf.astype(str).to_dict('records') + #gdf_dict = gdf.to_dict('records') + df = pd.DataFrame(gdf_dict) + df['locode'] = locode + df['osmid'] = osmid + except Exception as e: + # If an error occurs, create a DataFrame with only the 'locode' column + df = pd.DataFrame({'locode': [locode], 'osmid':[osmid]}) + print(f"Error processing record with osmid {osmid}: {e}") + return df + +def row_generator(df): + for index, row in df.iterrows(): + yield process_record(row.to_dict()) + +# Define an exporter function to write data to PostgreSQL +@custom +def export_data_to_postgres(data, *args, **kwargs)-> None: + url = 'https://github.com/Open-Earth-Foundation/locode-to-osmid/blob/main/locode_to_osmid.csv?raw=true' + df = pd.read_csv(url) + df = df[~df.set_index(['locode', 'osmid']).index.isin(data.set_index(['locode', 'osmid']).index)].head(5) + data_generator = row_generator(df) + + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + # Setup PostgreSQL loader with configuration + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + for record in data_generator: + # Assuming 'your_schema_name' and 'your_table_name' are known + schema_name = 'raw_data' + table_name = 'osm_polygon_staging_delta' + # Export each DataFrame to PostgreSQL + loader.export( + record, + schema_name, + table_name, + if_exists='append', # Append each record to the table + index=False, + ) + loader.commit() # Commit after each export + return df \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/custom/drop_delta_osm_polygons_staging.sql b/global-api/importer-mage/cc-mage/custom/drop_delta_osm_polygons_staging.sql new file mode 100644 index 00000000..e0d4f797 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/drop_delta_osm_polygons_staging.sql @@ -0,0 +1 @@ +drop table if exists raw_data.osm_polygons_staging_delta; \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/custom/integrated_polygons.sql b/global-api/importer-mage/cc-mage/custom/integrated_polygons.sql new file mode 100644 index 00000000..6d4b0ede --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/integrated_polygons.sql @@ -0,0 +1,6 @@ +--SELECT locode, bbox_north, bbox_south, bbox_east, bbox_west, geometry, 'osm' AS source_system +--FROM raw_data.osm_polygon +--WHERE locode NOT IN (SELECT locode FROM raw_data.custom_polygons) +--UNION issue with different datatypes +SELECT locode, bbox_north, bbox_south, bbox_east, bbox_west, polygon_wkt AS geometry, 'custom' as source_system +FROM raw_data.custom_polygons; diff --git a/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_custom.sql b/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_custom.sql new file mode 100644 index 00000000..16a83735 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_custom.sql @@ -0,0 +1,3 @@ +COPY climatetrace_staging +FROM 'raw_data/climatetrace/extracted/transportation/DATA/domestic-shipping_emissions_sources.csv' +WITH (FORMAT CSV, HEADER); \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_duckdb.py b/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_duckdb.py new file mode 100644 index 00000000..f784ff56 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/load_climatetrace_sector_duckdb.py @@ -0,0 +1,26 @@ +import duckdb + +if 'custom' not in globals(): + from mage_ai.data_preparation.decorators import custom +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@custom +def transform_custom(*args, **kwargs): + """ + Climate Trace has alot of data and is too memory intensive for pandas + """ + query = f''' SELECT * FROM raw_data/climatetrace/extracted/{climatetrace_sector}/DATA/*._emissions_sources.csv + ''' + + + return {} + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/custom/load_osm_polygons_generator.py b/global-api/importer-mage/cc-mage/custom/load_osm_polygons_generator.py new file mode 100644 index 00000000..f8e8d095 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/load_osm_polygons_generator.py @@ -0,0 +1,64 @@ +import osmnx as ox +import pandas as pd +import geopandas as gpd +import time + +from mage_ai.io.postgres import Postgres +from mage_ai.io.config import ConfigFileLoader +from mage_ai.data_preparation.decorators import data_exporter +import pandas as pd +from os import path +from mage_ai.settings.repo import get_repo_path + +def process_record(record): + osmid = record['osmid'] + locode = record['locode'] + try: + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + gdf['geometry'] = gdf['geometry'].apply(lambda geom: geom.wkt) + gdf_dict = gdf.astype(str).to_dict('records') + df = pd.DataFrame(gdf_dict) + df['locode'] = locode + df['osmid'] = osmid + + time.sleep(1) + + except Exception as e: + # If an error occurs, create a DataFrame with only the 'locode' column + df = pd.DataFrame({'locode': [locode], 'osmid': [osmid]}) + print(f"Error processing record with osmid {osmid}: {e}") + return df + +def csv_generator(url, country_code=None): + df = pd.read_csv(url) + if country_code: + #df = df[df['locode']=='US PVK'] + df = df[df['locode'].str[:2] == country_code] + + for index, row in df.iterrows(): + yield process_record(row.to_dict()) + +# Define an exporter function to write data to PostgreSQL +@custom +def export_data_to_postgres(*args, **kwargs)-> None: + country = kwargs['country'] + url = 'https://github.com/Open-Earth-Foundation/locode-to-osmid/blob/main/locode_to_osmid.csv?raw=true' + data_generator = csv_generator(url, country) + + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + # Setup PostgreSQL loader with configuration + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + for record in data_generator: + schema_name = 'raw_data' + table_name = 'osm_city_polygon_staging' + # Export each DataFrame to PostgreSQL + loader.export( + record, + schema_name, + table_name, + if_exists='append', # Append each record to the table + index=False, + ) + loader.commit() # Commit after each export \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/custom/osm_polygons_generator.py b/global-api/importer-mage/cc-mage/custom/osm_polygons_generator.py new file mode 100644 index 00000000..56b72fe8 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/osm_polygons_generator.py @@ -0,0 +1,19 @@ +def get_polygon_osmid(osmid, locode): + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + geometry_wkt = gdf.geometry.apply(lambda geom: geom.wkt) + df_geometry = pd.DataFrame(geometry_wkt, columns=['geometry']) + df_attributes = gdf.drop(columns='geometry') + df_with_geometry = pd.concat([df_attributes, df_geometry], axis=1) + df_with_geometry['locode'] = locode + return df_with_geometry + +def generator_function(data): + for row in data: + yield row + + +@custom +def run_generator_function(data, *args, **kwargs): + row_record = generator_function(data) + print(row_record) + return 1 \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/custom/update_raw_data_osm_polygons.sql b/global-api/importer-mage/cc-mage/custom/update_raw_data_osm_polygons.sql new file mode 100644 index 00000000..9272e076 --- /dev/null +++ b/global-api/importer-mage/cc-mage/custom/update_raw_data_osm_polygons.sql @@ -0,0 +1,60 @@ +CREATE TABLE IF NOT EXISTS raw_data.osm_city_polygon AS +SELECT locode, + osmid, + ST_SetSRID(ST_GeomFromText(geometry), 4326) as geometry, + ST_GeometryType(ST_GeomFromText(geometry)) AS geometry_type, + cast(bbox_north as numeric) as bbox_north, + cast(bbox_south as numeric) as bbox_south, + cast(bbox_east as numeric) as bbox_east, + cast(bbox_west as numeric) as bbox_west, + cast(place_id as numeric) as place_id, + osm_type, + cast(osm_id as numeric) osm_id, + cast(lat as DOUBLE PRECISION) as lat, + cast(lon as DOUBLE PRECISION) as lon, + "_class" as geom_class, + "_type" as geom_type, + cast(place_rank as int) as place_rank, + cast(importance as numeric) as importance, + addresstype, + "_name" as geom_name, + display_name +FROM raw_data.osm_city_polygon_staging; + +DELETE FROM raw_data.osm_city_polygon_staging +WHERE locode IN (SELECT locode FROM raw_data.osm_city_polygon_staging); + +-- Step 2: Drop indexes before insertion +DROP INDEX IF EXISTS osm_emission_i_poly; +DROP INDEX IF EXISTS osm_emission_i; + +INSERT INTO raw_data.osm_city_polygon +SELECT DISTINCT locode, + osmid, + ST_SetSRID(ST_GeomFromText(geometry), 4326) as geometry, + ST_GeometryType(ST_GeomFromText(geometry)) AS geometry_type, + cast(bbox_north as numeric) as bbox_north, + cast(bbox_south as numeric) as bbox_south, + cast(bbox_east as numeric) as bbox_east, + cast(bbox_west as numeric) as bbox_west, + cast(place_id as numeric) as place_id, + osm_type, + cast(osm_id as numeric) osm_id, + cast(lat as DOUBLE PRECISION) as lat, + cast(lon as DOUBLE PRECISION) as lon, + "_class" as geom_class, + "_type" as geom_type, + cast(place_rank as int) as place_rank, + cast(importance as numeric) as importance, + addresstype, + "_name" as geom_name, + display_name +FROM raw_data.osm_city_polygon_staging; + +CREATE INDEX IF NOT EXISTS osm_emission_i_poly +ON raw_data.osm_city_polygon USING gist (geometry); -- Using GiST index for geometry + +CREATE INDEX IF NOT EXISTS osm_emission_i +ON raw_data.osm_city_polygon (locode); + +DROP TABLE IF EXISTS raw_data.osm_city_polygon_staging; \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/__init__.py b/global-api/importer-mage/cc-mage/data_exporters/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/data_exporters/export_edgar_attributed_staging.py b/global-api/importer-mage/cc-mage/data_exporters/export_edgar_attributed_staging.py new file mode 100644 index 00000000..26b2bc5a --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/export_edgar_attributed_staging.py @@ -0,0 +1,44 @@ +import xarray as xr +import pandas as pd + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(data, **kwargs) -> None: + """ + Load all the variable attribute details into the database + """ + schema_name = 'raw_data' + table_name = 'edgar_attributes_staging' + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + attr_df = pd.DataFrame() + filepath = data + ds = xr.open_dataset(filepath) + + for var_name, var in ds.variables.items(): + attributes = {f'{var_name}_{attr_name}': attr_value for attr_name, attr_value in var.attrs.items()} + df_attributes = pd.DataFrame.from_dict(attributes, orient='index').T + attr_df = pd.concat([attr_df, df_attributes], axis=1) + + edgar_industry = kwargs['edgar_industry'] + attr_df['edgar_sector'] = edgar_industry + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + attr_df, + schema_name, + table_name, + index=False, + if_exists='replace', + ) + return attr_df \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/export_edgar_emissions_staging.py b/global-api/importer-mage/cc-mage/data_exporters/export_edgar_emissions_staging.py new file mode 100644 index 00000000..97be3c91 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/export_edgar_emissions_staging.py @@ -0,0 +1,36 @@ +import xarray as xr + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(data, **kwargs) -> None: + """ + Read in the file and load into the database + """ + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'edgar_emissions_staging' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + filepath = data + ds = xr.open_dataset(filepath) + df_emissions = ds['emissions'].to_dataframe().reset_index() + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df_emissions, + schema_name, + table_name, + index=False, + if_exists='replace', + ) + + return df_emissions diff --git a/global-api/importer-mage/cc-mage/data_exporters/extract_file_to_s3.py b/global-api/importer-mage/cc-mage/data_exporters/extract_file_to_s3.py new file mode 100644 index 00000000..18731686 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/extract_file_to_s3.py @@ -0,0 +1,42 @@ +import requests +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.s3 import S3 +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_s3(**kwargs) -> None: + """ + Template for exporting data to a S3 bucket. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#s3 + """ + + """ + Load data from a URL and upload it to an S3 bucket. + """ + url = kwargs['url'] + response = requests.get(url) + response.raise_for_status() + file_name = kwargs['file_name'] + + with open(f'{file_name}', 'wb') as f: + f.write(response.content) + + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + bucket_name = 'global-api-raw-data' + object_key = kwargs['object_key'] + + S3.with_config(ConfigFileLoader(config_path, config_profile)).export( + file_name, + bucket_name, + object_key, + ) diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_file.sql b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_file.sql new file mode 100644 index 00000000..f75fa74e --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_file.sql @@ -0,0 +1,3 @@ +GRANT pg_read_server_files TO ccglobal; + +COPY raw_data.climatetrace_staging FROM 'raw_data/climatetrace/extracted/transportation/DATA/domestic-shipping_emissions_sources_confidence.csv' WITH (FORMAT csv); \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_geom.py b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_geom.py new file mode 100644 index 00000000..a8bb7999 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_geom.py @@ -0,0 +1,34 @@ +import geopandas as gpd + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + """ + """ + schema_name = 'raw_data' + table_name = 'climatetrace_emissions_geom' + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + climatetrace_sector = kwargs['climatetrace_sector'] + file_path = f'raw_data/climatetrace/extracted/{climatetrace_sector}/DATA/transportation_geometries.gpkg' + df = gpd.read_file(file_path) + + # with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + # loader.export( + # df, + # schema_name, + # table_name, + # index=False, + # if_exists='replace', + # ) + return df \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_sector.py b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_sector.py new file mode 100644 index 00000000..a5c07f6e --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_climatetrace_sector.py @@ -0,0 +1,57 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +import os +from os import path +import pandas as pd +import numpy as np # Import numpy for NaN values + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + """ + Parameterised pipeline to ingest climate trace into the raw data schema + """ + table_name = 'climatetrace_source_staging' + schema_name = 'raw_data' + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + climatetrace_sector = kwargs['climatetrace_sector'] + directory = f'raw_data/climatetrace/extracted/{climatetrace_sector}/DATA/' + + for filename in os.listdir(directory): + if filename.endswith("emissions_sources.csv"): + filepath = os.path.join(directory, filename) + chunk_size = 1000 + reader = pd.read_csv(filepath, chunksize=chunk_size) + + required_columns = [ + 'source_id', 'source_name', 'source_type', 'iso3_country', 'original_inventory_sector', + 'start_time', 'end_time', 'lat', 'lon', 'geometry_ref', 'gas', 'emissions_quantity', + 'temporal_granularity', 'activity', 'activity_units', 'emissions_factor', + 'emissions_factor_units', 'capacity', 'capacity_units', 'capacity_factor' + ] + dtype_map = {'source_type': 'VARCHAR', 'capacity': 'integer'} + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + for chunk in reader: + missing_columns = set(required_columns) - set(chunk.columns) + for col in missing_columns: + chunk[col] = np.nan + + chunk = chunk[required_columns] + chunk['source_type'] = chunk['source_type'].astype(str) + + loader.export( + chunk, + schema_name, + table_name, + index=False, + if_exists='append', + dtype=dtype_map + ) diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygon_nigeria.py b/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygon_nigeria.py new file mode 100644 index 00000000..87b73196 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygon_nigeria.py @@ -0,0 +1,49 @@ +import pandas as pd +import geopandas as gpd +from shapely import wkt + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + """ + Load locode for Nigeria + """ + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'custom_polygon_staging' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + url = "https://datacatalogfiles.worldbank.org/ddh-published/0039368/DR0048906/ngaadmbndaadm2osgof20170222.geojson?versionId=2023-01-19T03:44:38.1621981Z" + gdf_ng = gpd.read_file(url) + gdf_ng = gdf_ng.to_crs("EPSG:4326") + + locode_mapping = {'Abuja Municipal': 'NG ABV', 'Lagos Island': 'NG LOS'} + gdf_ng['locode'] = gdf_ng['admin2Name'].map(locode_mapping) + #gdf_ng = gdf_ng.explode() + #gdf_ng = gdf_ng[['locode', 'geometry']] + #gdf_ng[['bbox_west', 'bbox_south', 'bbox_east', 'bbox_north']] = gdf_ng.geometry.bounds + #gdf_ng['lat'] = gdf_ng.geometry.centroid.y + #gdf_ng['lon'] = gdf_ng.geometry.centroid.x + #gdf_ng['area'] = gdf_ng.geometry.area + #gdf_ng['type'] = 'custom' + #gdf_ng['polygon_wkt'] = gdf_ng['geometry'].apply(lambda x: x.wkt) + #df_ng = pd.DataFrame(gdf_ng) + + # with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + # loader.export( + # df, + # schema_name, + # table_name, + # index=False, # Specifies whether to include index in exported table + # if_exists='replace', # Specify resolution policy if table name already exists + # ) + return gdf_ng diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygons_mendoza.py b/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygons_mendoza.py new file mode 100644 index 00000000..18b8168d --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_custom_polygons_mendoza.py @@ -0,0 +1,56 @@ +import geopandas as gpd +from shapely.geometry import Polygon +from shapely import wkt +import pandas as pd + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + + # read in the file and load into database + gdf = gpd.read_file('raw_data/custom_polygons/extracted/Limites Ciudad/Limites Ciudad.shp') + gdf.crs = "EPSG:22192" + gdf = gdf.to_crs("EPSG:4326") + linestring = gdf['geometry'].iloc[0] + polygon = Polygon(linestring) + polygon_wkt = wkt.dumps(polygon) + bbox = linestring.bounds + bbox_north, bbox_south, bbox_east, bbox_west = bbox + center_point = linestring.centroid + + # Create DataFrame + data = { + 'locode': ['AR MDZ'], + 'bbox_north': [bbox_north], + 'bbox_south': [bbox_south], + 'bbox_east': [bbox_east], + 'bbox_west': [bbox_west], + 'center_lat': [center_point.y], + 'center_lon': [center_point.x], + 'polygon_wkt': [polygon_wkt] + } + + df = pd.DataFrame(data) + + + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'custom_polygon_staging' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='replace', # Specify resolution policy if table name already exists + ) diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_osm_polygons_delta.sql b/global-api/importer-mage/cc-mage/data_exporters/load_osm_polygons_delta.sql new file mode 100644 index 00000000..e3d2251c --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_osm_polygons_delta.sql @@ -0,0 +1,27 @@ +DELETE FROM raw_data.osm_city_polygon +WHERE locode IN (SELECT locode FROM raw_data.osm_polygon_staging_delta); + +INSERT INTO raw_data.osm_city_polygon +SELECT DISTINCT locode, + osmid, + ST_SetSRID(ST_GeomFromText(geometry), 4326) as geometry, + ST_GeometryType(ST_GeomFromText(geometry)) AS geometry_type, + cast(bbox_north as numeric) as bbox_north, + cast(bbox_south as numeric) as bbox_south, + cast(bbox_east as numeric) as bbox_east, + cast(bbox_west as numeric) as bbox_west, + cast(place_id as numeric) as place_id, + osm_type, + cast(osm_id as numeric) osm_id, + cast(lat as DOUBLE PRECISION) as lat, + cast(lon as DOUBLE PRECISION) as lon, + "_class" as geom_class, + "_type" as geom_type, + cast(place_rank as int) as place_rank, + cast(importance as numeric) as importance, + addresstype, + "_name" as geom_name, + display_name +FROM raw_data.osm_polygon_staging_delta; + +DROP TABLE raw_data.osm_polygon_staging_delta; \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_osmid_polygons.py b/global-api/importer-mage/cc-mage/data_exporters/load_osmid_polygons.py new file mode 100644 index 00000000..62e6d2be --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_osmid_polygons.py @@ -0,0 +1,56 @@ +import pandas as pd +import osmnx as ox + +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + """ + Template for exporting data to a PostgreSQL database. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#postgresql + """ + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'osm_polygons_generator' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='append', # Specify resolution policy if table name already exists + ) + +def process_record(record): + osmid = record['osmid'] + locode = record['locode'] + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + gdf['geometry'] = gdf['geometry'].apply(lambda geom: geom.wkt) + gdf_dict = gdf.to_dict('records') + df = pd.DataFrame(gdf_dict) + df['locode'] = locode + return df + +def csv_generator(url): + df = pd.read_csv(url).head(3) + for index, row in df.iterrows(): + yield process_record(row.to_dict()) + +url = 'https://github.com/Open-Earth-Foundation/locode-to-osmid/blob/main/locode_to_osmid.csv?raw=true' +data_generator = csv_generator(url) + +# Iterate over the generator and export each record to PostgreSQL +for record in data_generator: + export_data_to_postgres(record) \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_exporters/load_raw_data_osm.py b/global-api/importer-mage/cc-mage/data_exporters/load_raw_data_osm.py new file mode 100644 index 00000000..bf3b88eb --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/load_raw_data_osm.py @@ -0,0 +1,31 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from pandas import DataFrame +from os import path + +if 'data_exporter' not in globals(): + from mage_ai.data_preparation.decorators import data_exporter + + +@data_exporter +def export_data_to_postgres(df: DataFrame, **kwargs) -> None: + """ + Template for exporting data to a PostgreSQL database. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#postgresql + """ + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'osm_polygons' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='replace', # Specify resolution policy if table name already exists + ) diff --git a/global-api/importer-mage/cc-mage/data_exporters/s3_sesco.py b/global-api/importer-mage/cc-mage/data_exporters/s3_sesco.py new file mode 100644 index 00000000..479ca9fa --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_exporters/s3_sesco.py @@ -0,0 +1,21 @@ +@data_loader +def load_data_from_url_and_upload_to_s3(url: str, bucket_name: str, object_key: str, **kwargs) -> None: + """ + Load data from a URL and upload it to an S3 bucket. + """ + response = requests.get(url) + response.raise_for_status() + + with open('extracted_sesco_activity_region', 'wb') as f: + f.write(response.content) + + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + bucket_name = + object_key = + + s3 = S3.with_config(ConfigFileLoader(config_path, config_profile)) + s3.export('local/argentina/sesco/extracted_sesco_activity_region', bucket_name, object_key) + + print(f"File uploaded to S3 bucket {bucket_name} with key {object_key}") diff --git a/global-api/importer-mage/cc-mage/data_loaders/__init__.py b/global-api/importer-mage/cc-mage/data_loaders/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/data_loaders/delta_current_locode_osmid.py b/global-api/importer-mage/cc-mage/data_loaders/delta_current_locode_osmid.py new file mode 100644 index 00000000..9e340e43 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/delta_current_locode_osmid.py @@ -0,0 +1,29 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from os import path +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_postgres(*args, **kwargs): + """ + Template for loading data from a PostgreSQL database. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#postgresql + """ + query = 'select distinct locode, osmid from raw_data.osm_city_polygon where osmid is not null' # Specify your SQL query here + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + return loader.load(query) + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/data_loaders/extract_edgar_dataset.py b/global-api/importer-mage/cc-mage/data_loaders/extract_edgar_dataset.py new file mode 100644 index 00000000..ace700bf --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/extract_edgar_dataset.py @@ -0,0 +1,39 @@ +import zipfile +import os +import io +import requests + +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_api(*args, **kwargs): + """ + Load data from url in edgar base directory + https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/EDGAR/datasets/v80_FT2022_GHG/ + """ + + edgar_gas = kwargs['edgar_gas'] + edgar_year = kwargs['edgar_year'] + edgar_industry = kwargs['edgar_industry'] + + + url = f'https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/EDGAR/datasets/v80_FT2022_GHG/{edgar_gas}/{edgar_industry}/emi_nc/v8.0_FT2022_GHG_{edgar_gas}_{edgar_year}_{edgar_industry}_emi_nc.zip' + response = requests.get(url) + extract_dir = f'raw_data/edgar/extracted/' + + with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref: + zip_ref.extractall(extract_dir) + + extracted_files = zip_ref.namelist() + extracted_not_readme_files = [file for file in extracted_files if '_readme.html' not in file] + extracted_file_path = os.path.join(extract_dir, extracted_not_readme_files[0]) + + return extracted_file_path + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The file could not be downloaded' diff --git a/global-api/importer-mage/cc-mage/data_loaders/extract_mendoza_polygon.py b/global-api/importer-mage/cc-mage/data_loaders/extract_mendoza_polygon.py new file mode 100644 index 00000000..d83f04db --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/extract_mendoza_polygon.py @@ -0,0 +1,31 @@ +import zipfile +import os +import geopandas as gpd + +from mage_ai.io.file import FileIO +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_file(*args, **kwargs): + """ + unzip file to extracted folder + """ + filepath = 'raw_data/custom_polygons/Limites Ciudad-001.zip' + extract_dir = 'raw_data/custom_polygons/extracted/' + + with zipfile.ZipFile(filepath, 'r') as zip_ref: + zip_ref.extractall(extract_dir) + + return 1 + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_loaders/indec_read_s3.py b/global-api/importer-mage/cc-mage/data_loaders/indec_read_s3.py new file mode 100644 index 00000000..abad6dce --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/indec_read_s3.py @@ -0,0 +1,37 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.s3 import S3 +from pandas import DataFrame +from os import path +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_from_s3_bucket(*args, **kwargs) -> DataFrame: + """ + Template for loading data from a S3 bucket. + Specify your configuration settings in 'io_config.yaml'. + + Docs: https://docs.mage.ai/design/data-loading#s3 + """ + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + bucket_name = 'global-api-raw-data' + object_key = 'local/argentina/indec/indec_industrial_waste.xls' + + return S3.with_config(ConfigFileLoader(config_path, config_profile)).load( + bucket_name, + object_key, + ) + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_files.py b/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_files.py new file mode 100644 index 00000000..862a3e3d --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_files.py @@ -0,0 +1,32 @@ +import zipfile +import os +import io +import requests + +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_api(*args, **kwargs): + """ + Load data from url in climatetrace directory + """ + climatetrace_sector = kwargs['climatetrace_sector'] + + url = f'https://downloads.climatetrace.org/v02/sector_packages/{climatetrace_sector}.zip' + response = requests.get(url) + extract_dir = f'raw_data/climatetrace/extracted/{climatetrace_sector}/' + + with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref: + zip_ref.extractall(extract_dir) + + extracted_files = zip_ref.namelist() + + return extracted_files + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The file could not be downloaded' diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_gpc_sector.sql b/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_gpc_sector.sql new file mode 100644 index 00000000..4f1adfbe --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_gpc_sector.sql @@ -0,0 +1,10 @@ + +CREATE TABLE IF NOT EXISTS raw_data.climatetrace_sector_description AS +SELECT * FROM ( + VALUES + ('road-transportation', 'II.1.1'), + ('domestic-aviation', 'II.4.3'), + ('domestic-shipping', 'II.3.3'), + ('international-shipping', 'II.3.3'), + ('international-aviation', 'II.4.3') +) AS climatetrace_sector (original_inventory_sector, gpc_refno); diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_sector.sql b/global-api/importer-mage/cc-mage/data_loaders/load_climatetrace_sector.sql new file mode 100644 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_edgar_dataset.py b/global-api/importer-mage/cc-mage/data_loaders/load_edgar_dataset.py new file mode 100644 index 00000000..4155f253 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_edgar_dataset.py @@ -0,0 +1,25 @@ +import xarray as xr + +from mage_ai.io.file import FileIO +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_file(data, *args, **kwargs): + """ + Read in the file path from previous extract + to process for raw_data table + """ + filepath = data + ds = xr.open_dataset(filepath) + df_emissions = ds['emissions'].to_dataframe().reset_index() + + return df_emissions + + +@test +def test_output(output, *args) -> None: + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_locode_osmid.py b/global-api/importer-mage/cc-mage/data_loaders/load_locode_osmid.py new file mode 100644 index 00000000..7673f43e --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_locode_osmid.py @@ -0,0 +1,26 @@ +import io +import pandas as pd +import requests +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_api(*args, **kwargs): + """ + Template for loading data from API + """ + url = 'https://github.com/Open-Earth-Foundation/locode-to-osmid/blob/main/locode_to_osmid.csv?raw=true' + response = requests.get(url) + + return pd.read_csv(io.StringIO(response.text)).head(50) + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_osm_polygons.py b/global-api/importer-mage/cc-mage/data_loaders/load_osm_polygons.py new file mode 100644 index 00000000..d7854cdd --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_osm_polygons.py @@ -0,0 +1,47 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from os import path +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_postgres(*args, **kwargs): + schema_name = 'raw_data' # Specify the name of the schema to export data to + table_name = 'osm_polygons_generator' # Specify the name of the table to export data to + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader: + loader.export( + df, + schema_name, + table_name, + index=False, # Specifies whether to include index in exported table + if_exists='append', # Specify resolution policy if table name already exists + ) + +def process_record(record): + osmid = record['osmid'] + locode = record['locode'] + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + gdf['geometry'] = gdf['geometry'].apply(lambda geom: geom.wkt) + gdf_dict = gdf.to_dict('records') + df = pd.DataFrame(gdf_dict) + df['locode'] = locode + return df + +def csv_generator(url): + df = pd.read_csv(url).head(3) + for index, row in df.iterrows(): + yield process_record(row.to_dict()) + +url = 'https://github.com/Open-Earth-Foundation/locode-to-osmid/blob/main/locode_to_osmid.csv?raw=true' +data_generator = csv_generator(url) + +# Iterate over the generator and export each record to PostgreSQL +for record in data_generator: + export_data_to_postgres(record) diff --git a/global-api/importer-mage/cc-mage/data_loaders/load_sesco.py b/global-api/importer-mage/cc-mage/data_loaders/load_sesco.py new file mode 100644 index 00000000..168d6eb5 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/load_sesco.py @@ -0,0 +1,26 @@ +import io +import pandas as pd +import requests +if 'data_loader' not in globals(): + from mage_ai.data_preparation.decorators import data_loader +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@data_loader +def load_data_from_api(*args, **kwargs): + """ + Template for loading data from API + """ + url = 'http://datos.energia.gob.ar/dataset/5bdc436c-60d4-4c86-98ab-59834d047700/resource/f0e4e10a-e4b8-44e6-bd16-763a43742107/download/ventas-excluye-ventas-a-empresas-del-sector-.csv' + response = requests.get(url) + + return pd.read_csv(io.StringIO(response.text), sep=',') + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/data_loaders/update_climatetrace_sector.sql b/global-api/importer-mage/cc-mage/data_loaders/update_climatetrace_sector.sql new file mode 100644 index 00000000..67286681 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/update_climatetrace_sector.sql @@ -0,0 +1,56 @@ +--DROP TABLE IF EXISTS raw_data.climatetrace_source_staging; + +CREATE TABLE IF NOT EXISTS raw_data.climatetrace_source_emissions AS +SELECT source_id, + source_name, + source_type, + iso3_country, + original_inventory_sector, + EXTRACT(YEAR FROM CAST(start_time as DATE)) AS year, + ST_MakePoint(lon, lat) as geometry, + lat, + lon, + gas, + temporal_granularity, + activity, + activity_units, + emissions_factor, + emissions_factor_units, + capacity, + capacity_units, + capacity_factor +FROM raw_data.climatetrace_source_staging +WHERE emissions_quantity > 0 +; +DELETE FROM raw_data.climatetrace_source_emissions +WHERE (original_inventory_sector, year) IN +(SELECT DISTINCT original_inventory_sector, year +FROM raw_data.climatetrace_source_staging) +; + +INSERT INTO raw_data.climatetrace_source_emissions +SELECT source_id, + source_name, + source_type, + iso3_country, + original_inventory_sector, + EXTRACT(YEAR FROM CAST(start_time as DATE)) AS year, + ST_MakePoint(lon, lat) as geometry, + lat, + lon, + gas, + temporal_granularity, + activity, + activity_units, + emissions_factor, + emissions_factor_units, + capacity, + capacity_units, + capacity_factor +FROM raw_data.climatetrace_source_staging +WHERE emissions_quantity > 0 +; + +CREATE INDEX IF NOT EXISTS i +ON raw_data.climatetrace_source_emissions (original_inventory_sector, year); +; \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/data_loaders/update_custom_polygons_mendoza.sql b/global-api/importer-mage/cc-mage/data_loaders/update_custom_polygons_mendoza.sql new file mode 100644 index 00000000..63c06f45 --- /dev/null +++ b/global-api/importer-mage/cc-mage/data_loaders/update_custom_polygons_mendoza.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS raw_data.custom_polygon AS +SELECT locode, bbox_north, bbox_south, bbox_east, bbox_west, center_lat, center_lon, ST_GeomFromText(polygon_wkt, 4326) as geometry +FROM raw_data.custom_polygon_staging; + +DELETE FROM raw_data.custom_polygon +WHERE locode IN (SELECT locode FROM raw_data.custom_polygon_staging); + +INSERT INTO raw_data.custom_polygon +SELECT locode, bbox_north, bbox_south, bbox_east, bbox_west, center_lat, center_lon, ST_GeomFromText(polygon_wkt, 4326) as geometry +FROM raw_data.custom_polygon_staging; + +DROP TABLE IF EXISTS raw_data.custom_polygon_staging; + +UPDATE raw_data.osm_city_polygon AS tgt +SET + geometry = src.geometry, + bbox_north = src.bbox_north, + bbox_south = src.bbox_south, + bbox_east = src.bbox_east, + bbox_west = src.bbox_west, + geom_type = 'custom' +FROM raw_data.custom_polygon AS src +WHERE tgt.locode = src.locode; \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/dbt/profiles.yml b/global-api/importer-mage/cc-mage/dbt/profiles.yml new file mode 100755 index 00000000..90599f89 --- /dev/null +++ b/global-api/importer-mage/cc-mage/dbt/profiles.yml @@ -0,0 +1,9 @@ +# https://docs.getdbt.com/docs/core/connect-data-platform/profiles.yml + +base: + outputs: + + dev: + type: duckdb + + target: dev diff --git a/global-api/importer-mage/cc-mage/extensions/__init__.py b/global-api/importer-mage/cc-mage/extensions/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/interactions/__init__.py b/global-api/importer-mage/cc-mage/interactions/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/io_config.yaml b/global-api/importer-mage/cc-mage/io_config.yaml new file mode 100755 index 00000000..e876c123 --- /dev/null +++ b/global-api/importer-mage/cc-mage/io_config.yaml @@ -0,0 +1,138 @@ +version: 0.1.1 +default: + # Default profile created for data IO access. + # Add your credentials for the source you use, and delete the rest. + # AWS + AWS_ACCESS_KEY_ID: "{{ mage_secret_var('AWS_ACCESS_KEY_ID') }}" + AWS_SECRET_ACCESS_KEY: "{{ mage_secret_var('AWS_SECRET_ACCESS_KEY') }}" + #AWS_SESSION_TOKEN: session_token (Used to generate Redshift credentials) + AWS_REGION: us-east-1 + # Algolia + ALGOLIA_APP_ID: app_id + ALGOLIA_API_KEY: api_key + ALGOLIA_INDEX_NAME: index_name + # Azure + AZURE_CLIENT_ID: "{{ env_var('AZURE_CLIENT_ID') }}" + AZURE_CLIENT_SECRET: "{{ env_var('AZURE_CLIENT_SECRET') }}" + AZURE_STORAGE_ACCOUNT_NAME: "{{ env_var('AZURE_STORAGE_ACCOUNT_NAME') }}" + AZURE_TENANT_ID: "{{ env_var('AZURE_TENANT_ID') }}" + # Chroma + CHROMA_COLLECTION: collection_name + CHROMA_PATH: path + # Clickhouse + CLICKHOUSE_DATABASE: default + CLICKHOUSE_HOST: host.docker.internal + CLICKHOUSE_INTERFACE: http + CLICKHOUSE_PASSWORD: null + CLICKHOUSE_PORT: 8123 + CLICKHOUSE_USERNAME: null + # Druid + DRUID_HOST: hostname + DRUID_PASSWORD: password + DRUID_PATH: /druid/v2/sql/ + DRUID_PORT: 8082 + DRUID_SCHEME: http + DRUID_USER: user + # DuckDB + DUCKDB_DATABASE: my_db + DUCKDB_SCHEMA: main + # Google + GOOGLE_SERVICE_ACC_KEY: + type: service_account + project_id: project-id + private_key_id: key-id + private_key: "-----BEGIN PRIVATE KEY-----\nyour_private_key\n-----END_PRIVATE_KEY" + client_email: your_service_account_email + auth_uri: "https://accounts.google.com/o/oauth2/auth" + token_uri: "https://accounts.google.com/o/oauth2/token" + auth_provider_x509_cert_url: "https://www.googleapis.com/oauth2/v1/certs" + client_x509_cert_url: "https://www.googleapis.com/robot/v1/metadata/x509/your_service_account_email" + GOOGLE_SERVICE_ACC_KEY_FILEPATH: "/path/to/your/service/account/key.json" + GOOGLE_LOCATION: US # Optional + # MongoDB + # Specify either the connection string or the (host, password, user, port) to connect to MongoDB. + MONGODB_CONNECTION_STRING: "mongodb://{username}:{password}@{host}:{port}/" + MONGODB_HOST: host + MONGODB_PORT: 27017 + MONGODB_USER: user + MONGODB_PASSWORD: password + MONGODB_DATABASE: database + MONGODB_COLLECTION: collection + # MSSQL + MSSQL_DATABASE: database + MSSQL_SCHEMA: schema + MSSQL_DRIVER: "ODBC Driver 18 for SQL Server" + MSSQL_HOST: host + MSSQL_PASSWORD: password + MSSQL_PORT: 1433 + MSSQL_USER: SA + # MySQL + MYSQL_DATABASE: database + MYSQL_HOST: host + MYSQL_PASSWORD: password + MYSQL_PORT: 3306 + MYSQL_USER: root + # Pinot + PINOT_HOST: hostname + PINOT_PASSWORD: password + PINOT_PATH: /query/sql + PINOT_PORT: 8000 + PINOT_SCHEME: http + PINOT_USER: user + # PostgresSQL + POSTGRES_CONNECT_TIMEOUT: 10 + POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}" + #POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}" + POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}" + POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}" + POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}" + POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}" + # duckdb + # DUCKDB_DATABASE: database + # DUCKDB_SCHEMA: main + # Qdrant + QDRANT_COLLECTION: collection + QDRANT_PATH: path + # Redshift + REDSHIFT_SCHEMA: public # Optional + REDSHIFT_DBNAME: redshift_db_name + REDSHIFT_HOST: redshift_cluster_id.identifier.region.redshift.amazonaws.com + REDSHIFT_PORT: 5439 + REDSHIFT_TEMP_CRED_USER: temp_username + REDSHIFT_TEMP_CRED_PASSWORD: temp_password + REDSHIFT_DBUSER: redshift_db_user + REDSHIFT_CLUSTER_ID: redshift_cluster_id + REDSHIFT_IAM_PROFILE: default + # Snowflake + SNOWFLAKE_USER: username + SNOWFLAKE_PASSWORD: password + SNOWFLAKE_ACCOUNT: account_id.region + SNOWFLAKE_DEFAULT_WH: null # Optional default warehouse + SNOWFLAKE_DEFAULT_DB: null # Optional default database + SNOWFLAKE_DEFAULT_SCHEMA: null # Optional default schema + SNOWFLAKE_PRIVATE_KEY_PASSPHRASE: null # Optional private key passphrase + SNOWFLAKE_PRIVATE_KEY_PATH: null # Optional private key path + SNOWFLAKE_ROLE: null # Optional role name + SNOWFLAKE_TIMEOUT: null # Optional timeout in seconds + # Trino + trino: + catalog: postgresql # Change this to the catalog of your choice + host: 127.0.0.1 + http_headers: + X-Something: 'mage=power' + http_scheme: http + password: mage1337 # Optional + port: 8080 + schema: core_data + session_properties: # Optional + acc01.optimize_locality_enabled: false + optimize_hash_generation: true + source: trino-cli # Optional + user: admin + verify: /path/to/your/ca.crt # Optional + # Weaviate + WEAVIATE_ENDPOINT: https://some-endpoint.weaviate.network + WEAVIATE_INSTANCE_API_KEY: YOUR-WEAVIATE-API-KEY + WEAVIATE_INFERENCE_API_KEY: YOUR-OPENAI-API-KEY + WEAVIATE_COLLECTION: collectionn_name + diff --git a/global-api/importer-mage/cc-mage/markdowns/__init__.py b/global-api/importer-mage/cc-mage/markdowns/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/markdowns/cliimatetrace_description.md b/global-api/importer-mage/cc-mage/markdowns/cliimatetrace_description.md new file mode 100644 index 00000000..e854a93c --- /dev/null +++ b/global-api/importer-mage/cc-mage/markdowns/cliimatetrace_description.md @@ -0,0 +1,8 @@ +## Climate Trace + +The choice of sector are below: + +* transportation +* waste +* power +* buildings \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/markdowns/file_extraction.md b/global-api/importer-mage/cc-mage/markdowns/file_extraction.md new file mode 100644 index 00000000..b20c9e87 --- /dev/null +++ b/global-api/importer-mage/cc-mage/markdowns/file_extraction.md @@ -0,0 +1,8 @@ +Step to ingest data to s3 + +run pipeline: __urlfile_to_s3__ + +parameters +* url = 'https://www.indec.gob.ar/ftp/cuadros/economia/cuadros_epi_03_23.xls' +* file_name = 'indec_industrial_waste' +* object_key = 'local/argentina/indec/indec_industrial_waste.xls' \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/markdowns/introduction.md b/global-api/importer-mage/cc-mage/markdowns/introduction.md new file mode 100644 index 00000000..786149c2 --- /dev/null +++ b/global-api/importer-mage/cc-mage/markdowns/introduction.md @@ -0,0 +1,3 @@ +# Industrial Products Statistics - INDEC + +The information originates from different sources: INDEC's own surveys, data from other government agencies and information from business institutions. In the case of some products (wine, beer, soft drinks, cigarettes, cement, boats), to make up for the lack of production statistics or to complement them, figures for registrations, sales or shipments of national products are recorded. \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/metadata.yaml b/global-api/importer-mage/cc-mage/metadata.yaml new file mode 100755 index 00000000..6c185352 --- /dev/null +++ b/global-api/importer-mage/cc-mage/metadata.yaml @@ -0,0 +1,65 @@ +project_type: standalone + +variables_dir: ~/.mage_data +# remote_variables_dir: s3://bucket/path_prefix + +variables_retention_period: '90d' + +emr_config: + # You can customize the EMR cluster instance size with the two parameters + master_instance_type: 'r5.4xlarge' + slave_instance_type: 'r5.4xlarge' + + # Configure security groups for EMR cluster instances. + # The default managed security groups are ElasticMapReduce-master and ElasticMapReduce-slave + # master_security_group: 'sg-xxxxxxxxxxxx' + # slave_security_group: 'sg-yyyyyyyyyyyy' + + # If you want to ssh tunnel into EMR cluster, ec2_key_name must be configured. + # You can create a key pair in page https://console.aws.amazon.com/ec2#KeyPairs and download the key file. + # ec2_key_name: '[ec2_key_pair_name]' + +spark_config: + # Application name + app_name: 'my spark app' + # Master URL to connect to + # e.g., spark_master: 'spark://host:port', or spark_master: 'yarn' + spark_master: 'local' + # Executor environment variables + # e.g., executor_env: {'PYTHONPATH': '/home/path'} + executor_env: {} + # Jar files to be uploaded to the cluster and added to the classpath + # e.g., spark_jars: ['/home/path/example1.jar'] + spark_jars: [] + # Path where Spark is installed on worker nodes + # e.g. spark_home: '/usr/lib/spark' + spark_home: + # List of key-value pairs to be set in SparkConf + # e.g., others: {'spark.executor.memory': '4g', 'spark.executor.cores': '2'} + others: {} + # Whether to create custom SparkSession via code and set in kwargs['context'] + use_custom_session: false + # The variable name to set in kwargs['context'], + # e.g. kwargs['context']['spark'] = spark_session + custom_session_var_name: 'spark' + +# notification_config: +# alert_on: +# - trigger_success +# - trigger_failure +# slack_config: +# webhook_url: "{{ env_var('MAGE_SLACK_WEBHOOK_URL') }}" +# message_templates: +# success: +# details: > +# Pipeline uuid: {pipeline_uuid}. Trigger name: {pipeline_schedule_name}. +# Execution time: {execution_time}. +# Congratulations on a successful run! +# failure: +# details: > +# Pipeline uuid: {pipeline_uuid}. Trigger name: {pipeline_schedule_name}. +# Execution time: {execution_time}. +# Test custom message. + +project_uuid: 334d2f5a699e41ca933c535aa35fc54e +help_improve_mage: true diff --git a/global-api/importer-mage/cc-mage/pipelines/__init__.py b/global-api/importer-mage/cc-mage/pipelines/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/city_polygons_fullload/__init__.py b/global-api/importer-mage/cc-mage/pipelines/city_polygons_fullload/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/city_polygons_fullload/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/city_polygons_fullload/metadata.yaml new file mode 100755 index 00000000..7d863835 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/city_polygons_fullload/metadata.yaml @@ -0,0 +1,66 @@ +blocks: +- all_upstream_blocks_executed: true + color: yellow + configuration: {} + downstream_blocks: + - update_raw_data_osm_polygons + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: load_osm_polygons_generator + retry_config: null + status: updated + timeout: null + type: custom + upstream_blocks: [] + uuid: load_osm_polygons_generator +- all_upstream_blocks_executed: false + color: blue + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: update_raw_data_osm_polygons + retry_config: null + status: updated + timeout: null + type: custom + upstream_blocks: + - load_osm_polygons_generator + uuid: update_raw_data_osm_polygons +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-03-27 01:08:16.843963+00:00' +data_integration: null +description: Runs full end to end all polygons and custom polygon ingestion and integration +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: city_polygons_fullload +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: city_polygons_fullload +variables: + country: MC +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/indec_industrial_waste/__init__.py b/global-api/importer-mage/cc-mage/pipelines/indec_industrial_waste/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/indec_industrial_waste/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/indec_industrial_waste/metadata.yaml new file mode 100755 index 00000000..cd38356b --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/indec_industrial_waste/metadata.yaml @@ -0,0 +1,70 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: markdown + name: Introduction + retry_config: null + status: updated + timeout: null + type: markdown + upstream_blocks: [] + uuid: introduction +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: markdown + name: file_extraction + retry_config: null + status: updated + timeout: null + type: markdown + upstream_blocks: [] + uuid: file_extraction +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: indec_read_s3 + retry_config: null + status: failed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: indec_read_s3 +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-06-06 18:35:46.313733+00:00' +data_integration: null +description: null +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: indec_industrial_waste +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: indec_industrial_waste +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_climatetrace/__init__.py b/global-api/importer-mage/cc-mage/pipelines/ingest_climatetrace/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_climatetrace/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/ingest_climatetrace/metadata.yaml new file mode 100755 index 00000000..2de604c5 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/ingest_climatetrace/metadata.yaml @@ -0,0 +1,122 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: markdown + name: cliimatetrace_description + retry_config: null + status: updated + timeout: null + type: markdown + upstream_blocks: [] + uuid: cliimatetrace_description +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - load_climatetrace_sector + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: load_climatetrace_files + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: load_climatetrace_files +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - update_climatetrace_sector + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: load_climatetrace_sector + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - load_climatetrace_files + uuid: load_climatetrace_sector +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: true + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: + - update_climatetrace_sector + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: load_climatetrace_gpc_sector + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: load_climatetrace_gpc_sector +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: update_climatetrace_sector + retry_config: null + status: updated + timeout: null + type: data_loader + upstream_blocks: + - load_climatetrace_sector + - load_climatetrace_gpc_sector + uuid: update_climatetrace_sector +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-04-17 00:38:21.931927+00:00' +data_integration: null +description: This will ingest climate trace from data url downloads +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: ingest_climatetrace +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: ingest_climatetrace +variables: + climatetrace_sector: waste +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_custom_polygons/__init__.py b/global-api/importer-mage/cc-mage/pipelines/ingest_custom_polygons/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_custom_polygons/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/ingest_custom_polygons/metadata.yaml new file mode 100755 index 00000000..a770a7c4 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/ingest_custom_polygons/metadata.yaml @@ -0,0 +1,81 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - load_custom_polygons_mendoza + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: extract_mendoza_polygon + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: extract_mendoza_polygon +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - update_custom_polygons_mendoza + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: load_custom_polygons_mendoza + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - extract_mendoza_polygon + uuid: load_custom_polygons_mendoza +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: update_custom_polygons_mendoza + retry_config: null + status: updated + timeout: null + type: data_loader + upstream_blocks: + - load_custom_polygons_mendoza + uuid: update_custom_polygons_mendoza +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-04-16 23:51:26.752010+00:00' +data_integration: null +description: This pipeline will load specific polygons not covered by osm +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: ingest_custom_polygons +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: ingest_custom_polygons +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_edgar/__init__.py b/global-api/importer-mage/cc-mage/pipelines/ingest_edgar/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/ingest_edgar/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/ingest_edgar/metadata.yaml new file mode 100755 index 00000000..81f02fe8 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/ingest_edgar/metadata.yaml @@ -0,0 +1,128 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - export_edgar_emissions_staging + - export_edgar_attributed_staging + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: extract_edgar_dataset + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: [] + uuid: extract_edgar_dataset +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - update_raw_data_edgar_emissions + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: export_edgar_emissions_staging + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - extract_edgar_dataset + uuid: export_edgar_emissions_staging +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - update_raw_data_edgar_emissions + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: export_edgar_attributed_staging + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - extract_edgar_dataset + uuid: export_edgar_attributed_staging +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: + - update_raw_data_edgar_emissions + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: reference_edgar_sector_staging + retry_config: null + status: executed + timeout: null + type: transformer + upstream_blocks: [] + uuid: reference_edgar_sector_staging +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: true + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: update_raw_data_edgar_emissions + retry_config: null + status: updated + timeout: null + type: transformer + upstream_blocks: + - export_edgar_attributed_staging + - export_edgar_emissions_staging + - reference_edgar_sector_staging + uuid: update_raw_data_edgar_emissions +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-03-31 23:31:52.296053+00:00' +data_integration: null +description: This will ingest the Edgar data into raw_data (only needs to be run once) +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: ingest_edgar +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: ingest_edgar +variables: + edgar_gas: CO2 + edgar_industry: TRO + edgar_year: 2022 +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/osm_polygons_incremental/__init__.py b/global-api/importer-mage/cc-mage/pipelines/osm_polygons_incremental/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/osm_polygons_incremental/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/osm_polygons_incremental/metadata.yaml new file mode 100755 index 00000000..e86fb536 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/osm_polygons_incremental/metadata.yaml @@ -0,0 +1,105 @@ +blocks: +- all_upstream_blocks_executed: true + color: blue + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: + - delta_current_locode_osmid + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: drop_delta_osm_polygons_staging + retry_config: null + status: executed + timeout: null + type: custom + upstream_blocks: [] + uuid: drop_delta_osm_polygons_staging +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: + - delta_osm_polygons_generator + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: delta_current_locode_osmid + retry_config: null + status: executed + timeout: null + type: data_loader + upstream_blocks: + - drop_delta_osm_polygons_staging + uuid: delta_current_locode_osmid +- all_upstream_blocks_executed: true + color: pink + configuration: {} + downstream_blocks: + - load_osm_polygons_delta + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: delta_osm_polygons_generator + retry_config: null + status: executed + timeout: null + type: custom + upstream_blocks: + - delta_current_locode_osmid + uuid: delta_osm_polygons_generator +- all_upstream_blocks_executed: true + color: null + configuration: + data_provider: postgres + data_provider_profile: default + dbt: {} + disable_query_preprocessing: false + export_write_policy: append + limit: 1000 + use_raw_sql: true + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: sql + name: load_osm_polygons_delta + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: + - delta_osm_polygons_generator + uuid: load_osm_polygons_delta +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-03-28 13:18:37.614207+00:00' +data_integration: null +description: This will only run a refresh if there is a change in locode-osmid mapping +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: osm_polygons_incremental +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: osm_polygons_incremental +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/pipelines/urlfile_to_s3/__init__.py b/global-api/importer-mage/cc-mage/pipelines/urlfile_to_s3/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/pipelines/urlfile_to_s3/metadata.yaml b/global-api/importer-mage/cc-mage/pipelines/urlfile_to_s3/metadata.yaml new file mode 100755 index 00000000..713f9b89 --- /dev/null +++ b/global-api/importer-mage/cc-mage/pipelines/urlfile_to_s3/metadata.yaml @@ -0,0 +1,45 @@ +blocks: +- all_upstream_blocks_executed: true + color: null + configuration: {} + downstream_blocks: [] + executor_config: null + executor_type: local_python + has_callback: false + language: python + name: extract_file_to_s3 + retry_config: null + status: executed + timeout: null + type: data_exporter + upstream_blocks: [] + uuid: extract_file_to_s3 +cache_block_output_in_memory: false +callbacks: [] +concurrency_config: {} +conditionals: [] +created_at: '2024-06-05 19:46:21.174386+00:00' +data_integration: null +description: Generic pipeline that reads file from a url (does not unzip) and load + to s3 bucket +executor_config: {} +executor_count: 1 +executor_type: null +extensions: {} +name: urlfile_to_s3 +notification_config: {} +remote_variables_dir: null +retry_config: {} +run_pipeline_in_one_process: false +settings: + triggers: null +spark_config: {} +tags: [] +type: python +uuid: urlfile_to_s3 +variables: + file_name: ' indec_industrial_waste' + object_key: local/argentina/indec/indec_industrial_waste.xls + url: https://www.indec.gob.ar/ftp/cuadros/economia/cuadros_epi_03_23.xls +variables_dir: /home/src/mage_data/cc-mage +widgets: [] diff --git a/global-api/importer-mage/cc-mage/requirements.txt b/global-api/importer-mage/cc-mage/requirements.txt new file mode 100755 index 00000000..d9bd4680 --- /dev/null +++ b/global-api/importer-mage/cc-mage/requirements.txt @@ -0,0 +1,10 @@ +#osmnx==1.9.3 +#cython +numpy +pandas==2.2.2 +#shapely==2.0.4 +#fiona +#pyproj==3.6.* +#geopandas==0.14.4 +#xarray==2024.* +#netcdf4 \ No newline at end of file diff --git a/global-api/importer-mage/cc-mage/sensors/__init__.py b/global-api/importer-mage/cc-mage/sensors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/sensors/test.py b/global-api/importer-mage/cc-mage/sensors/test.py new file mode 100644 index 00000000..5e450024 --- /dev/null +++ b/global-api/importer-mage/cc-mage/sensors/test.py @@ -0,0 +1,33 @@ +from mage_ai.settings.repo import get_repo_path +from mage_ai.io.config import ConfigFileLoader +from mage_ai.io.postgres import Postgres +from os import path + +if 'sensor' not in globals(): + from mage_ai.data_preparation.decorators import sensor + + +@sensor +def query_postgres_and_check_condition(*args, **kwargs) -> bool: + """ + Template code for checking the results of a Postgres query. + Specify your configuration settings in 'io_config.yaml'. + + Return: True if the sensor should complete, False if it should + keep waiting + """ + + config_path = path.join(get_repo_path(), 'io_config.yaml') + config_profile = 'default' + + query = 'select 1 as test_record' # Specify your SQL query here + + with Postgres.with_config( + ConfigFileLoader(config_path, config_profile)) as loader: + df = loader.load(query) + + # Add your checks here + if df.empty: + return False + + return True diff --git a/global-api/importer-mage/cc-mage/transformers/__init__.py b/global-api/importer-mage/cc-mage/transformers/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/cc-mage/transformers/crs_projection.py b/global-api/importer-mage/cc-mage/transformers/crs_projection.py new file mode 100644 index 00000000..0cd1521b --- /dev/null +++ b/global-api/importer-mage/cc-mage/transformers/crs_projection.py @@ -0,0 +1,53 @@ +import geopandas as gpd +from shapely.geometry import Polygon +from shapely import wkt +import pandas as pd + +if 'transformer' not in globals(): + from mage_ai.data_preparation.decorators import transformer +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@transformer +def transform(data, *args, **kwargs): + """ + Transforms custom polygon to standardised formate and projecion + + Returns: + geomtry of custom polygon in a standardised projection + """ + # Specify your transformation logic here + gdf = gpd.read_file('raw_data/custom_polygons/extracted/Limites Ciudad/Limites Ciudad.shp') + gdf.crs = "EPSG:22192" + gdf = gdf.to_crs("EPSG:4326") + linestring = gdf['geometry'].iloc[0] + polygon = Polygon(linestring) + polygon_wkt = wkt.dumps(polygon) + bbox = linestring.bounds + bbox_north, bbox_south, bbox_east, bbox_west = bbox + center_point = linestring.centroid + + # Create DataFrame + data = { + 'locode': ['AR MDZ'], + 'bbox_north': [bbox_north], + 'bbox_south': [bbox_south], + 'bbox_east': [bbox_east], + 'bbox_west': [bbox_west], + 'center_lat': [center_point.y], + 'center_lon': [center_point.x], + 'polygon_wkt': [polygon_wkt] + } + + df = pd.DataFrame(data) + + return df + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/transformers/osmid_to_polygon.py b/global-api/importer-mage/cc-mage/transformers/osmid_to_polygon.py new file mode 100644 index 00000000..1f33ad21 --- /dev/null +++ b/global-api/importer-mage/cc-mage/transformers/osmid_to_polygon.py @@ -0,0 +1,39 @@ +import osmnx as ox +import geopandas as gpd +import pandas as pd + +if 'transformer' not in globals(): + from mage_ai.data_preparation.decorators import transformer +if 'test' not in globals(): + from mage_ai.data_preparation.decorators import test + + +@transformer +def transform(data, *args, **kwargs): + """ + Args: + data: The output from the upstream parent block + args: The output from any additional upstream blocks (if applicable) + + Returns: + dataframe + """ + # Specify your transformation logic here + osmid = data['osmid'].iloc[0] + locode = data['locode'].iloc[0] + gdf = ox.geocode_to_gdf(osmid, by_osmid=True) + geometry_wkt = gdf.geometry.apply(lambda geom: geom.wkt) + df_geometry = pd.DataFrame(geometry_wkt, columns=['geometry']) + df_attributes = gdf.drop(columns='geometry') + df_with_geometry = pd.concat([df_attributes, df_geometry], axis=1) + df_with_geometry['locode'] = locode + + return df_with_geometry.head(1) + + +@test +def test_output(output, *args) -> None: + """ + Template code for testing the output of the block. + """ + assert output is not None, 'The output is undefined' diff --git a/global-api/importer-mage/cc-mage/transformers/reference_edgar_sector_staging.sql b/global-api/importer-mage/cc-mage/transformers/reference_edgar_sector_staging.sql new file mode 100644 index 00000000..4348147e --- /dev/null +++ b/global-api/importer-mage/cc-mage/transformers/reference_edgar_sector_staging.sql @@ -0,0 +1,32 @@ +DROP TABLE IF EXISTS raw_data.edgar_sector_description; + +CREATE TABLE raw_data.edgar_sector_description AS +SELECT * FROM ( + VALUES + ('AGS', 'Agricultural soils', '4C+4D1+4D2+4D4', '3C2+3C3+3C4+3C7', NULL), + ('AWB', 'Agricultural waste burning', '4F', '3C1b', NULL), + ('CHE', 'Chemical processes', '2B', '2B', NULL), + ('ENE', 'Power industry', '1A1a', '1A1a', NULL), + ('ENF', 'Enteric fermentation', '4A', '3A1', NULL), + ('IDE', 'Indirect emissions from NOx and NH3', '7B+7C', '5A', NULL), + ('IND', 'Combustion for manufacturing', '1A2', '1A2', 'I.3.1'), + ('IRO', 'Iron and steel production', '2C1a+2C1c+2C1d+2C1e+2C1f+2C2', '2C1+2C2', NULL), + ('MNM', 'Manure management', '4B', '3A2', NULL), + ('N2O', 'Indirect N2O emissions from agriculture', '4D3', '3C5+3C6', NULL), + ('PRO_FFF', 'Fuel exploitation (including fossil fuel fires)', '1B1a+1B2a1+1B2a2+1B2a3+1B2a4+1B2c+7A', '1B1a+1B2aiii2+1B2aiii3+1B2bi+1B2bii+5B', NULL), + ('PRO_COAL', 'Fuel exploitation COAL', '1B1a', '1B1a', NULL), + ('PRO_GAS', 'Fuel exploitation GAS', '1B2c', '1B2bi+1B2bii', NULL), + ('PRO_OIL', 'Fuel exploitation OIL', '1B2a1+1B2a2+1B2a3+1B2a4', '1B2aiii2+1B2aiii3', NULL), + ('PRU_SOL', 'Solvents and products use', '3', '2D3+2E+2F+2G', NULL), + ('RCO', 'Energy for buildings', '1A4', '1A4+1A5', NULL), + ('REF_TRF', 'Oil refineries and Transformation industry', '1A1b+1A1c+1A5b1+1B1b+1B2a5+1B2a6+1B2b5+2C1b', '1A1b+1A1ci+1A1cii+1A5biii+1B1b+1B2aiii6+1B2biii3+1B1c', NULL), + ('SWD_INC', 'Solid waste incineration', '6C+6Dhaz', '4C','III.3.1 + III.3.2'), + ('SWD_LDF', 'Solid waste landfills', '6A+6Dcom', '4A+4B', NULL), + ('TNR_Aviation_CDS', 'Aviation climbing and descent', '1A3a_CDS', '1A3a_CDS', 'II.4.3'), + ('TNR_Aviation_CRS', 'Aviation cruise', '1A3a_CRS', '1A3a_CRS', 'II.4.3'), + ('TNR_Aviation_LTO', 'Aviation landing&takeoff', '1A3a_LTO', '1A3a_LTO', 'II.4.3'), + ('TNR_Other', 'Railways, pipelines, off-road transport', '1A3c+1A3e', '1A3c+1A3e', NULL), + ('TNR_Ship', 'Shipping', '1A3d+1C2', '1A3d', 'II.3.3'), + ('TRO', 'Road transportation', '1A3b', '1A3b', 'II.1.1'), + ('WWT', 'Waste water handling', '6B', '4D', 'III.4.1 + III.4.2') +) AS edgar_sectors (edgar_sector, edgar_description, IPCC_1996_code, IPCC_2006_code, gpc_refno); diff --git a/global-api/importer-mage/cc-mage/transformers/update_raw_data_edgar_emissions.sql b/global-api/importer-mage/cc-mage/transformers/update_raw_data_edgar_emissions.sql new file mode 100644 index 00000000..5a8b32b4 --- /dev/null +++ b/global-api/importer-mage/cc-mage/transformers/update_raw_data_edgar_emissions.sql @@ -0,0 +1,38 @@ +CREATE TABLE IF NOT EXISTS raw_data.edgar_emissions AS +SELECT ST_SetSRID(ST_MakeEnvelope(lon - 0.05, lat - 0.05, lon + 0.05, lat + 0.05), 4326) AS geometry, + e.lat, e.lon, e.emissions, a.lat_units, a.lon_units, a.emissions_substance, cast(a.emissions_year as int) as emissions_year, a.emissions_units, a.emissions_release, a.emissions_description, s.edgar_sector, s.ipcc_2006_code, s.gpc_refno +FROM raw_data.edgar_emissions_staging e +CROSS JOIN raw_data.edgar_attributes_staging a +LEFT JOIN raw_data.edgar_sector_description s +ON a.edgar_sector = s.edgar_sector +WHERE e.emissions > 0; + +DELETE FROM raw_data.edgar_emissions +WHERE (emissions_description, emissions_substance, emissions_year) IN ( + SELECT emissions_description, emissions_substance, cast(emissions_year as int) as emissions_year + FROM raw_data.edgar_attributes_staging +); + +-- Step 2: Drop indexes before insertion +DROP INDEX IF EXISTS edgar_emission_i; +DROP INDEX IF EXISTS edgar_emission_i_poly; + +INSERT INTO raw_data.edgar_emissions +SELECT ST_SetSRID(ST_MakeEnvelope(lon - 0.05, lat - 0.05, lon + 0.05, lat + 0.05), 4326) AS geometry, + e.lat, e.lon, e.emissions, a.lat_units, a.lon_units, a.emissions_substance, cast(a.emissions_year as int) as emissions_year, a.emissions_units, a.emissions_release, a.emissions_description, s.edgar_sector, s.ipcc_2006_code, s.gpc_refno +FROM raw_data.edgar_emissions_staging e +CROSS JOIN raw_data.edgar_attributes_staging a +LEFT JOIN raw_data.edgar_sector_description s +ON a.edgar_sector = s.edgar_sector +WHERE e.emissions > 0; + +CREATE INDEX IF NOT EXISTS edgar_emission_i +ON raw_data.edgar_emissions (emissions_substance, emissions_year, emissions_description); + +CREATE INDEX IF NOT EXISTS edgar_emission_i_poly +ON raw_data.edgar_emissions (geometry); + + +DROP TABLE IF EXISTS raw_data.edgar_attributes_staging; +DROP TABLE IF EXISTS raw_data.edgar_emissions_staging; + diff --git a/global-api/importer-mage/cc-mage/transformers/update_target_table.sql b/global-api/importer-mage/cc-mage/transformers/update_target_table.sql new file mode 100644 index 00000000..678136fd --- /dev/null +++ b/global-api/importer-mage/cc-mage/transformers/update_target_table.sql @@ -0,0 +1 @@ +-- Docs: https://docs.mage.ai/guides/sql-blocks diff --git a/global-api/importer-mage/cc-mage/utils/__init__.py b/global-api/importer-mage/cc-mage/utils/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/global-api/importer-mage/dev.env b/global-api/importer-mage/dev.env new file mode 100644 index 00000000..b2eb7dce --- /dev/null +++ b/global-api/importer-mage/dev.env @@ -0,0 +1,10 @@ +POSTGRES_PORT=5432 +POSTGRES_HOST=host.docker.internal +POSTGRES_DBNAME=ccglobal +POSTGRES_PASSWORD=password +POSTGRES_SCHEMA=public +POSTGRES_USER=ccglobal + +#MAGE_SLACK_WEBHOOK_URL +PROJECT_NAME=cc-mage +ENV=dev \ No newline at end of file diff --git a/global-api/importer-mage/docker-compose.yml b/global-api/importer-mage/docker-compose.yml new file mode 100644 index 00000000..e4d6ac30 --- /dev/null +++ b/global-api/importer-mage/docker-compose.yml @@ -0,0 +1,15 @@ +version: '3' +services: + server: + image: mageai/mageai:latest + command: mage start ${PROJECT_NAME} + build: + context: . + dockerfile: Dockerfile + ports: + - 6789:6789 + volumes: + - .:/home/src/ + restart: on-failure:5 + stdin_open: true + tty: true \ No newline at end of file diff --git a/global-api/routes/city_locode_endpoint_edgar.py b/global-api/routes/city_locode_endpoint_edgar.py index ae74863e..8a0a18f7 100644 --- a/global-api/routes/city_locode_endpoint_edgar.py +++ b/global-api/routes/city_locode_endpoint_edgar.py @@ -76,4 +76,4 @@ def get_emissions_by_city_and_year(locode: str, year: int, gpcReferenceNumber: s } } - return totals + return totals \ No newline at end of file