# PyNutrien Demo/Development notebook

## Setup

1) Install AWS CLI and set secrets in `~/.aws/credentials`
2) Set profile region in `~/.aws/config` -- TODO see link



In [None]:

!pip install -r requirements.txt


## Setup Validation

In [12]:
import os
import sys
from pathlib import Path
#lib_path = str(Path(os.getcwd()).parent.absolute().resolve())
#sys.path.insert(0, lib_path)

['/home/glue_user/workspace', '/home/glue_user/workspace/jupyter_workspace', '/home/glue_user/aws-glue-libs/PyGlue.zip', '/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip', '/home/glue_user/spark/python', '/usr/lib64/python37.zip', '/usr/lib64/python3.7', '/usr/lib64/python3.7/lib-dynload', '', '/home/glue_user/.local/lib/python3.7/site-packages', '/usr/lib64/python3.7/site-packages', '/usr/lib/python3.7/site-packages', '/home/glue_user/.local/lib/python3.7/site-packages/IPython/extensions', '/home/glue_user/.ipython']


In [15]:
import pynutrien

from pynutrien.aws.glue import GlueSparkContext

context = GlueSparkContext()
context.spark_session # show SparkUI

## Imports and Dependencies

In [5]:
%env AWS_PROFILE=default

env: AWS_PROFILE=default


In [6]:
import pynutrien

## Library Usage Examples

### Catalog Reader Example

In [7]:

from pynutrien.aws.glue import GlueSparkContext
from pynutrien.aws.glue.catalog import GlueReader

context = GlueSparkContext()
Reader = GlueReader(context.glue_context, None)
df = (
    Reader.catalog("crawler_test", "caseware_customer_a")
        .transformation_ctx("cases")
        .dataframe()
)
df.show()

+-----------+-----------+------+-------------+----------+---------------+--------------+-----+-----------------+------------+-------+----------------+----+
|primary_key|       name|gender|customer_type|birth_date|    personal_id|   card_number|email|     phone_number|country_code|address|meta_last_update|year|
+-----------+-----------+------+-------------+----------+---------------+--------------+-----+-----------------+------------+-------+----------------+----+
|     135567|  T. Oakman|      |       Active|          | SIN: 154720759|Not Applicable|     |   1 822 435 3970|          CA|       |      2008-08-12|2017|
|     135635|B. Kanahele|      |     Prospect|1959-08-20| SIN: 755909975|Not Applicable|     |            -2865|          CA|       |      2008-08-08|2017|
|     135844|  B. Wandel|     M|       Active|1956-06-15|NINO: VT931050C|Not Applicable|     |                 |          CA|       |      2008-01-05|2017|
|     136073|    S. Naff|     M|       Active|1981-06-22| SSN: 8

### Job

In [8]:
# Add arguments
import sys
sys.argv.extend(["--cfg_file_path", "dummy", "--env_file_path", "dummy", "--abc", "123"])

This code defines a custom GlueJob class called MyGlueJob that inherits from the GlueJob class. The MyGlueJob class has several properties, including the job_name and arguments properties, and defines three methods: extract, transform, and load.

The extract method uses the Glue context to create a dynamic frame from a specified input path in JSON format. The transform and load methods currently do not have any implementation.

To run the job, an instance of the MyGlueJob class is created and the run method is called on it. This will run the extract, transform, and load methods in sequence.

In [9]:

from pynutrien.aws.glue import GlueJob

class MyGlueJob(GlueJob):
    job_name = "TEST_GLUE"
    arguments = ["abc"]
    input_path="s3://awsglue-datasets/examples/us-legislators/all/persons.json"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def extract(self,):
        
        self.dynamicframe = self.glue_context.create_dynamic_frame.from_options(
            connection_type="s3",
            connection_options={"paths": [self.input_path], "recurse": True},
            format="json",
        )

    def transform(self):
        pass

    def load(self):
        pass
    
    

job = MyGlueJob()
job.run()


2022-12-09 16:50:45.706 [INFO] [logger:75] MainThread:TEST_GLUE - Starting: TEST_GLUE
2022-12-09 16:50:45.707 [INFO] [logger:75] MainThread:TEST_GLUE - Starting: setup
2022-12-09 16:50:45.711 [INFO] [job:71] MainThread:TEST_GLUE - Library Version: '1.0.0'
2022-12-09 16:50:45.714 [INFO] [job:72] MainThread:TEST_GLUE - Supplied Arguments: ['/home/glue_user/.local/lib/python3.7/site-packages/ipykernel_launcher.py', '--ip=127.0.0.1', '--stdin=9019', '--control=9017', '--hb=9016', '--Session.signature_scheme="hmac-sha256"', '--Session.key=b"8fa504da-70d4-4fc4-b848-34bfccb8d45d"', '--shell=9018', '--transport="tcp"', '--iopub=9020', '--f=/home/glue_user/.local/share/jupyter/runtime/kernel-v2-708vv2DaeU9VqT4.json', '--cfg_file_path', 'dummy', '--env_file_path', 'dummy', '--abc', '123']
2022-12-09 16:50:45.716 [INFO] [job:73] MainThread:TEST_GLUE - Parsed Arguments: {'job_bookmark_option': 'job-bookmark-disable', 'job_bookmark_from': None, 'job_bookmark_to': None, 'JOB_ID': None, 'JOB_RUN_ID':

### DynamoDB

This code uses the boto3 library to interact with Amazon DynamoDB. The code first creates a Session object and uses it to create a client for DynamoDB. It then uses the waiter object to wait for the Music table to be created before inserting some items into the table using the batch_write_item method. The code also inserts a single item using the put_item method and retrieves an item using the get_item method.

Next, the code creates an instance of the DynamoDB class, which is a custom class for interacting with DynamoDB. It uses this instance to retrieve the same item as before and to update an item in the table using the update_item method. It also uses the scan_table method to scan the entire Music table.

Finally, the code creates an instance of the DynamoDBPandas class, which is a custom class for interacting with DynamoDB tables as Pandas dataframes. It uses this instance to get a dynamo_table object for the Music table and to insert, update, and delete items from the table.

In [15]:
import boto3
from pynutrien.aws.dynamodb import DynamoDB,DynamoDBPandas

session = boto3.session.Session()
client = session.client("dynamodb")
response = client.create_table(
    AttributeDefinitions=[
        {
            'AttributeName': 'Artist',
            'AttributeType': 'S',
        },
        {
            'AttributeName': 'SongTitle',
            'AttributeType': 'S',
        },
    ],
    KeySchema=[
        {
            'AttributeName': 'Artist',
            'KeyType': 'HASH',
        },
        {
            'AttributeName': 'SongTitle',
            'KeyType': 'RANGE',
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5,
    },
    TableName='Music',
)
waiter = client.get_waiter("table_exists")
waiter.wait(TableName="Music", WaiterConfig={"Delay": 1, "MaxAttempts": 100})
response = client.batch_write_item(
    RequestItems={
        "Music": [
            {
                "PutRequest": {
                    "Item": {
                        "AlbumTitle": {
                            "S": "Somewhat Famous",
                        },
                        "Artist": {
                            "S": "No One You Know",
                        },
                        "SongTitle": {
                            "S": "Call Me Today",
                        },
                    },
                },
            },
            {
                "PutRequest": {
                    "Item": {
                        "AlbumTitle": {
                            "S": "Songs About Life",
                        },
                        "Artist": {
                            "S": "Acme Band",
                        },
                        "SongTitle": {
                            "S": "Happy Day",
                        },
                    },
                },
            },
            {
                "PutRequest": {
                    "Item": {
                        "AlbumTitle": {
                            "S": "Blue Sky Blues",
                        },
                        "Artist": {
                            "S": "No One You Know",
                        },
                        "SongTitle": {
                            "S": "Scared of My Shadow",
                        },
                    },
                },
            },
        ],
    },
)
response = client.put_item(
    Item={
        "AlbumTitle": {
            "S": "Somewhat Famous",
        },
        "Artist": {
            "S": "No One You Know",
        },
        "SongTitle": {
            "S": "Call Me Today",
        },
    },
    ReturnConsumedCapacity="TOTAL",
    TableName="Music",
)
my_dyno = DynamoDB(session)
response_get = client.get_item(
    Key={
        "Artist": {
            "S": "Acme Band",
        },
        "SongTitle": {
            "S": "Happy Day",
        },
    },
    TableName="Music",
)
my_dyno_get = my_dyno.get_item(
    key={
        "Artist": {
            "S": "Acme Band",
        },
        "SongTitle": {
            "S": "Happy Day",
        },
    },
    table_name="Music",
)
assert my_dyno_get == response_get["Item"]
update_response = my_dyno.update_item(
    ExpressionAttributeNames={
        "#AT": "AlbumTitle",
        "#Y": "Year",
    },
    ExpressionAttributeValues={
        ":t": {
            "S": "Louder Than Ever",
        },
        ":y": {
            "N": "2015",
        },
    },
    key={
        "Artist": {
            "S": "Acme Band",
        },
        "SongTitle": {
            "S": "Happy Day",
        },
    },
    ReturnValues="ALL_NEW",
    table_name="Music",
    UpdateExpression="SET #Y = :y, #AT = :t",
)
assert update_response["Attributes"] == {
    "AlbumTitle": {"S": "Louder Than Ever"},
    "Artist": {"S": "Acme Band"},
    "Year": {"N": "2015"},
    "SongTitle": {"S": "Happy Day"},
}
scan_response = my_dyno.scan_table(table_name="Music")
response = client.scan(TableName="Music")
assert response["Items"] == scan_response["Items"]
my_dyno_pd = DynamoDBPandas(session)
dynamo_table = my_dyno_pd.get_table(table_name="Music")
assert dynamo_table.name == "Music"
my_dyno_pd.put_items(
    table_name="Music",
    item_list=[
        {"Artist": "Nobody", "SongTitle": "Not Famous"},
        {"Artist": "Nicho", "SongTitle": "Bad song"},
    ],
)
assert dynamo_table.get_item(Key={"Artist": "Nobody", "SongTitle": "Not Famous"})[
    "Item"
] == {"Artist": "Nobody", "SongTitle": "Not Famous"}
my_dyno_pd.delete_items(
    table_name="Music",
    item_list=[
        {"Artist": "Nobody", "SongTitle": "Not Famous"},
        {"Artist": "Nicho", "SongTitle": "Bad song"},
    ],
)
after_del_resp = dynamo_table.get_item(
    Key={"Artist": "Nobody", "SongTitle": "Not Famous"}
)
assert "Item" not in after_del_resp
response = client.delete_table(TableName="Music")


2022-12-09 16:56:33.632 [INFO] [credentials:1223] MainThread:botocore.credentials - Found credentials in shared credentials file: ~/.aws/credentials


### Redshift

This code uses the RedshiftConnector class to connect to an Amazon Redshift cluster. The RedshiftConnector class takes several arguments, including the iam flag to indicate whether to use IAM authentication, the name of the database to connect to, the username and password for the database user, and the identifier for the Redshift cluster.

Once the RedshiftConnector instance is created, you can use it to interact with the Redshift cluster. For example, you could use the query method to execute SQL queries on the cluster, or the fetch_df method to fetch the results of a query as a Pandas dataframe.

The following code works in case you have `REDSHIFT_USER` and `REDSHIFT_PASSWORD` defined in your `.env` fuke in the current working directory.

In [18]:
from dotenv import load_dotenv
from pynutrien.aws.redshift import RedshiftConnector  
load_dotenv()
redshift_user = os.getenv('REDSHIFT_USER')
redshift_password = os.getenv('REDSHIFT_PASSWORD')
conn = RedshiftConnector(
    iam=True,
    database='dev',
    db_user=redshift_user,
    password=redshift_password,
    user=redshift_user,
    cluster_identifier='nutrien-insights-redshift-cluster-test',
)



RuntimeError: Missing required arguments. Please make sure arguments and values exist for                 'database','db_user', 'password', 'user', 'cluster_identifier'

### Config

This code defines four string variables containing sample data in JSON, YAML, TOML, and INI formats, respectively. It also defines a sample_dict dictionary.

Next, the code creates four instances of the ConfigParser class, passing each instance the corresponding text data. The ConfigParser class is a custom class that appears to provide methods for parsing and merging configuration data from various formats.

The code then calls the merge_dictionaries method on the first ConfigParser instance and passes it the sample_dict as an argument. This would merge the data from json_text with the data from sample_dict and return the resulting dictionary.

The code also contains commented out code that appears to use the S3Operations and S3FileSystem classes to read data from an Amazon S3 bucket, but this code is not executed.

In [23]:
import json
from pynutrien.core.config import ConfigParser
json_text = """
{
"fruit": {
    "apple": {
        "size": "small",
        "color": "red",
        "country": "USA"
    },
    "banana":{
        "size": "medium",
        "color": "yellow",
        "country": "Fiji"
    },
    "orange":{
        "size": "large",
        "color": "orange",
        "country": "Egypt"
    }
}
}
"""
yaml_text = """
    name: "Vishvajit"
    age: 23
    address: Noida
    Occupation: Software Developer
    Skills:
    - Python
    - Django
    - Flask
    - FastAPI
    - DRF ( Django Rest Framework )
"""
toml_text = """
    [user]
    player_x.color = "blue"
    player_o.color = "green"

    [constant]
    board_size = 3

    [server]
    url = "https://tictactoe.example.com"
"""
ini_text = """
    [apple]
    size = small
    color = red
    country = USA
"""
sample_dict = {
    "Vegetable": {
        "carrot": {"size": "cylindrical", "color": "orange", "country": "America"}
    }
}
my_config_parser=ConfigParser()
print(my_config_parser.parse_json(json_text))
print(my_config_parser.parse_ini(ini_text))
print(my_config_parser.parse_yaml(yaml_text))
print(my_config_parser.parse_toml(toml_text))

{'fruit': {'apple': {'size': 'small', 'color': 'red', 'country': 'USA'}, 'banana': {'size': 'medium', 'color': 'yellow', 'country': 'Fiji'}, 'orange': {'size': 'large', 'color': 'orange', 'country': 'Egypt'}}}
{'apple': {'size': 'small', 'color': 'red', 'country': 'USA'}}
{'name': 'Vishvajit', 'age': 23, 'address': 'Noida', 'Occupation': 'Software Developer', 'Skills': ['Python', 'Django', 'Flask', 'FastAPI', 'DRF ( Django Rest Framework )']}
{'user': {'player_x': {'color': 'blue'}, 'player_o': {'color': 'green'}}, 'constant': {'board_size': 3}, 'server': {'url': 'https://tictactoe.example.com'}}


### Logger

This code imports the sleep function from the time module, creates two Logger objects, and uses a RuntimeLogger context manager to time the execution of a block of code.

The Logger class is used to create logger objects that can be used to log messages at different levels (e.g., info, warning, etc.). The __name__ argument passed to the Logger constructor determines the logger's name. If the __name__ argument is not provided, the logger's name is set to "" (an empty string).

In this code, the first Logger object is created with the __name__ argument set to __name__, which will be the name of the module in which this code is executed. The second Logger object is created without any arguments, so its name will be an empty string.

The assert statement at the beginning of the code checks that the two Logger objects are the same object (i.e., they are references to the same instance of the Logger class). This is because the Logger class uses the singleton design pattern, which ensures that only one instance of the class is created and returned whenever the Logger constructor is called.

The logger and logger2 objects are then used to log messages at the info and warning levels.

The RuntimeLogger context manager is used to time the execution of a block of code. The RuntimeLogger constructor takes two arguments: a logger object, and a label that specifies the name of the operation being timed. The with statement is used to create a RuntimeLogger context, which starts a timer when the block of code inside the with statement is entered and stops the timer when the block is exited.

In [24]:

from time import sleep
from pynutrien.core.logger import Logger,RuntimeLogger
logger = Logger(__name__)
logger2 = Logger()

assert logger is logger2

logger.info("Hello")
logger.warning("Hello")

with RuntimeLogger(logger, "time_this"):
    sleep(0.1)

# This should not be done, since it will print the traceback/error for an exception that is handled
try:
    with RuntimeLogger(logger, "time_this_2"):
        sleep(0.1)
        raise RuntimeError("oops")
        sleep(0.1)
except RuntimeError:
    pass

# when debugging, this will print the traceback once for stderr and once for stdout
# in the glue environment it will be split into the different output streams
with RuntimeLogger(logger, "time_this_3"):
    sleep(0.1)
    raise RuntimeError("oops2")
    sleep(0.1)


2022-12-09 17:04:30.637 [INFO] [<ipython-input-24-d89f52f7d5d6>:8] MainThread:__main__ - Hello
2022-12-09 17:04:30.657 [INFO] [logger:75] MainThread:__main__ - Starting: time_this
2022-12-09 17:04:30.774 [INFO] [logger:79] MainThread:__main__ - End: time_this [0.116s]
2022-12-09 17:04:30.776 [INFO] [logger:75] MainThread:__main__ - Starting: time_this_2
2022-12-09 17:04:30.886 [INFO] [logger:89] MainThread:__main__ - End: time_this_2 [0.110s] with error
2022-12-09 17:04:30.888 [ERROR] [logger:93] MainThread:__main__ - Exception Raised: RuntimeError('oops')
2022-12-09 17:04:30.893 [INFO] [logger:75] MainThread:__main__ - Starting: time_this_3
2022-12-09 17:04:30.994 [INFO] [logger:89] MainThread:__main__ - End: time_this_3 [0.101s] with error
2022-12-09 17:04:30.996 [ERROR] [logger:93] MainThread:__main__ - Exception Raised: RuntimeError('oops2')


RuntimeError: oops2

### ETL

This code defines two classes, TestETL1 and TestETL2, that inherit from the ETLExtendedBase class. Both classes define the setup, extract, transform, and load methods that are required by the ETLExtendedBase class. The TestETL1 class also defines a job_name attribute, while the TestETL2 class defines a job_name method instead of an attribute.

The TestETL1 and TestETL2 classes are subclasses of the ETLExtendedBase class and are therefore examples of the ETL (extract-transform-load) design pattern. In the ETL design pattern, data is extracted from one or more sources, transformed into a format that is suitable for analysis or storage, and then loaded into a destination, such as a database or a data warehouse.

The ETLExtendedBase class is an abstract base class that defines the methods and attributes that are common to all ETL jobs. It defines the setup, extract, transform, and load methods, which are used to perform the four main steps of the ETL process. It also defines the job_name attribute, which specifies the name of the ETL job.

In [25]:
from pynutrien.etl import ETLExtendedBase

class TestETL1(ETLExtendedBase):
    job_name = "TEST_ETL_1"

    def setup(self):
        pass

    def extract(self):
        pass

    def transform(self):
        pass

    def load(self):
        pass

class TestETL2(ETLExtendedBase):
    job_name = "TEST_ETL_2"

    # def job_name(self): pass

    def extract(self):
        pass

    def transform(self):
        raise Exception("Test")

    def load(self):
        pass

j1 = TestETL1()
j1.run()

j2 = TestETL2()
j2.run()


2022-12-09 17:05:24.560 [INFO] [logger:75] MainThread:__main__ - Starting: TEST_ETL_1
2022-12-09 17:05:24.563 [INFO] [logger:75] MainThread:__main__ - Starting: setup
2022-12-09 17:05:24.566 [INFO] [logger:79] MainThread:__main__ - End: setup [0.002s]
2022-12-09 17:05:24.571 [INFO] [logger:75] MainThread:__main__ - Starting: extract
2022-12-09 17:05:24.572 [INFO] [logger:79] MainThread:__main__ - End: extract [0.001s]
2022-12-09 17:05:24.578 [INFO] [logger:75] MainThread:__main__ - Starting: transform
2022-12-09 17:05:24.579 [INFO] [logger:79] MainThread:__main__ - End: transform [0.001s]
2022-12-09 17:05:24.580 [INFO] [logger:75] MainThread:__main__ - Starting: load
2022-12-09 17:05:24.581 [INFO] [logger:79] MainThread:__main__ - End: load [0.001s]
2022-12-09 17:05:24.583 [INFO] [logger:79] MainThread:__main__ - End: TEST_ETL_1 [0.023s]
2022-12-09 17:05:24.584 [INFO] [logger:75] MainThread:__main__ - Starting: TEST_ETL_2
2022-12-09 17:05:24.585 [INFO] [logger:75] MainThread:__main__ -

Exception: Test

### Column

This code creates a PySpark DataFrame with three columns and two rows of data, then passes the DataFrame to a function called standardize_column_names to clean up the column names. The resulting DataFrame is printed out, along with its schema (i.e. the data types of its columns).

In [27]:

import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pynutrien.util.column import standardize_column_names
spark = SparkSession.builder.getOrCreate()
spark_df_with_integer_headers = spark.createDataFrame(data=[
        ('a','c',2),
        ('b','d',4)
    ],
    schema = T.StructType([
    T.StructField('1         ', T.StringType(), False),
    T.StructField('2', T.StringType(), False),
    T.StructField('Price   ', T.IntegerType(), False),
]))

new_df = standardize_column_names(spark_df_with_integer_headers)
print(type(new_df))
new_df.printSchema()


<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- 1: string (nullable = false)
 |-- 2: string (nullable = false)
 |-- price: integer (nullable = false)

