### Imports, defining the decorator, ...

In [17]:
import pandas as pd
from marquez_client import MarquezClient
from marquez_client.models import JobType
import os
from openlineage.client import OpenLineageClient
from openlineage.client.transport.file import FileConfig, FileTransport
from openlineage.client.transport.http import HttpConfig, HttpCompression, HttpTransport
from openlineage.client.event_v2 import RunEvent, RunState, Run, Job, Dataset, DatasetEvent, JobEvent
from openlineage.client.uuid import generate_new_uuid
from datetime import datetime
from openlineage.client.facet import NominalTimeRunFacet
import logging
from functools import wraps
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import plotly.express as px
import pickle
import re
import brightway2 as bw
import bw2data as bd
import bw2io as bi
# import bw2calc as bc
# import bw2analyzer as bwa
import inspect
import getpass
from openlineage.client.uuid import generate_new_uuid
import json
import hashlib
from openlineage.client.facet import  SqlJobFacet, SchemaDatasetFacet, SchemaField, OutputStatisticsOutputDatasetFacet,SourceCodeLocationJobFacet, NominalTimeRunFacet,DataQualityMetricsInputDatasetFacet,ColumnMetric
# from openlineage.client import ZonedDateTime

In [18]:
def generate_dataset_hash(data):
    return hashlib.sha256(data.encode('utf-8')).hexdigest()

In [19]:
last_output_dataset_name = None
last_output_dataset_hash = None
seen_datasets = {} ### dict with names and hashes as values
os.environ['OPENLINEAGE_CONFIG'] = 'openlineage.yml'  

def log_job_2(func):
    @wraps(func)
    def wrapper(*args, **kwargs):

        if not args and not kwargs:
            return func
            
        namespace = 'trial_1'
        transport_type = "http"
        PRODUCER = f"https://github.com/openlineage-user"

        if transport_type == "file":
            file_config = FileConfig(
                log_file_path="test-lineage-file",
                append=True,
            )
            client = OpenLineageClient(transport=FileTransport(file_config))
        elif transport_type == "http":
            http_config = HttpConfig(
                url="http://localhost:5000",
                endpoint="api/v1/lineage",
                timeout=5,
                verify=False,
                # auth=ApiKeyTokenProvider({"apiKey": "f048521b-dfe8-47cd-9c65-0cb07d57591e"}),
                compression=HttpCompression.GZIP,
            )
            client = OpenLineageClient(transport=HttpTransport(http_config))
        elif transport_type == "console":
            # See https://openlineage.io/docs/client/python#console
            console_config = ConsoleConfig()
            client = OpenLineageClient(transport=ConsoleTransport(console_config))


        description = func.__doc__
        func_name = func.__name__
        executor = getpass.getuser() ### will this surely run on other systems?


        global last_output_dataset_name, last_output_dataset_hash
        global last_params
        sig = inspect.signature(func)
        bound_args = sig.bind(*args, **kwargs)
        bound_args.apply_defaults()

        last_params = bound_args.arguments  ### Store the last parameters
        ### Define the job (same for all function executions)
        job = Job(namespace= namespace, name= str(generate_new_uuid()))
        runId = str(generate_new_uuid())
        run = Run(
            runId=runId,
            facets={
                "transformation_description": description, #BaseFacet(description=description)
                'name' : func_name,
                'executed by': executor,
                'arguments': last_params,
                #'facets':{"nominalTime": NominalTimeRunFacet(getNominalStartTime())}
            } )
        

        if args:
            input_data = args[0] 
            output_data = func(*args, **kwargs)

        else:
            input_data = None 
            output_data = func(**kwargs)

        input_data_serialized = str(input_data) 
        output_data_serialized = str(output_data)


        input_hash = generate_dataset_hash(input_data_serialized)
        output_hash = generate_dataset_hash(output_data_serialized)
        # input_name = list(bound_args.arguments.keys())[0] if args else None

        input_name = f"{input_data=}".split('=')[0]
        output_name = 'output' + runId #f"{output_data=}".split('=')[0]
        seen_datasets[output_hash] = output_name

        if input_hash in seen_datasets.keys():
            input_dataset_name = seen_datasets[input_hash]
        else:
            input_dataset_name = input_name  

        input_dataset = Dataset(
            namespace=namespace,
            name=input_dataset_name,
            facets={"dataHash": {
            "type": "CustomFacet",  # Custom facet for dataset hash
            "hash": input_hash},  # Hash of the input dataset content
            'type' : type(input_data),
            #'schema' : schema
            })
        output_dataset = Dataset(
            namespace=namespace,
            name=output_name,
           facets={"dataHash": {
            "type": "CustomFacet",  
            "hash": output_hash},  
            'type' : type(output_data), })
            #"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
            #"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet"})

        client.emit(
            RunEvent(
                eventType=RunState.COMPLETE,
                eventTime=datetime.now().isoformat(),
                run=run,
                job=job,
                inputs=[input_dataset],
                outputs=[output_dataset]
            )    )

        return output_data
        wrapper.__name__ = func.__name__ + "_decorated" ### to keep the name of the function as is
    return wrapper

### Basic Example (Decorated User-Defined Functions)

In [20]:
df_1 = pd.read_csv('technosphere exchanges description.csv')

In [21]:
@log_job_2
def times_two(input = 154):
    '''multiplies the input by 2'''
    output = input * 2
    return output

In [22]:
df_2 = times_two(df_1)

In [23]:
@log_job_2
def first_forward(input):
    '''removes the first column'''
    if type(input) == pd.core.frame.DataFrame:
        output = input.iloc[:, 1:].copy()
    return output

In [24]:
df_3 = first_forward(df_2)

In [25]:
@log_job_2
def transposer(input):
    '''transposes the matrix'''
    if type(input) == pd.core.frame.DataFrame:
        output = input.T
    return output

In [26]:
df_4 = transposer(df_3)

In [27]:
df_5 = times_two(df_4)

### Ecoinvent Example

In [28]:
import sys
print(sys.getrecursionlimit())

10000


In [29]:
sys.setrecursionlimit(10000)

In [30]:
import importlib

importlib.reload(bi) ### in case you want to have a fresh start for bw2io

<module 'bw2io' from 'c:\\miniconda\\envs\\brightcon\\Lib\\site-packages\\bw2io\\__init__.py'>

In [31]:
### i have commented the last bit to ensure the recursion error doesnt happen (spoiler alert: it still does)

import sys
import inspect

def apply_logging_to_package(package, decorator):
 
     for name, obj in inspect.getmembers(package):

        if inspect.isfunction(obj):
            #print('well, hello there!')
            setattr(package, name, decorator(obj))

        elif inspect.isclass(obj):
            for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):

                setattr(obj, method_name, decorator(method))

        # elif inspect.ismodule(obj):
        #     for method_name, method in inspect.getmembers(obj, predicate=inspect.isfunction):

        #         setattr(obj, method_name, decorator(method))

apply_logging_to_package(bi, log_job_2)

In [32]:
### This is just the biosphere, but perhaps itd be better to also circle back to it

bi.remote.install_project(
    project_name="hack-a-fun!!!",
    project_key="ecoinvent-3.10-biosphere", 
    projects_config={"ecoinvent-3.10-biosphere": "ecoinvent-3.10-biosphere.bw2.tar.gz"} )

Restoring project backup archive - this could take a few minutes...


'hack-a-fun!!!'

In [33]:
bw.projects.set_current('hack-a-fun!!!')

In [34]:
bw.databases

Databases dictionary with 1 object(s):
	ecoinvent-3.10-biosphere

In [35]:
np.__version__ ### make sure its not 2.0 as theres currently incompatibility issues

'1.26.4'

In [36]:
db_path = '/Users/Farhang Raymand/Downloads/ecoinvent_test' #ecoinvent 3.10_cutoff_ecoSpold02/datasets'      ### Takes 10.5 minutes
db_name = 'ecoinvent 3.10 cutoff -  lineage'

if 'ecoinvent 3.10 small -  lineage' in bw.databases:
    print("Database has already been imported")
else:
    ei = bi.SingleOutputEcospold2Importer(db_path, db_name)

Extracting ecospold2 files:
0% [##############################] 100% | ETA: 00:00:00 | Item ID: ffaf660d-37cc-5
Total time elapsed: 00:00:27


Title: Extracting ecospold2 files:
  Started: 09/27/2024 11:28:40
  Finished: 09/27/2024 11:29:07
  Total time elapsed: 00:00:27
  CPU %: 85.20
  Memory %: 4.52
Extracted 2905 datasets in 27.21 seconds


In [37]:
ei.apply_strategies() 
ei.statistics()
ei.write_database()

Applying strategy: normalize_units
Applying strategy: update_ecoinvent_locations
Applying strategy: remove_zero_amount_coproducts
Applying strategy: remove_zero_amount_inputs_with_no_activity
Applying strategy: remove_unnamed_parameters
Applying strategy: es2_assign_only_product_with_amount_as_reference_product
Applying strategy: assign_single_product_as_activity
Applying strategy: create_composite_code
Applying strategy: drop_unspecified_subcategories
Applying strategy: fix_ecoinvent_flows_pre35


RecursionError: maximum recursion depth exceeded