# Data science with Python and Netezza Performance Server

After the release of _Netezza Performance Server on Cloud Pak for Data_ (Netezza on Cloud) and _Netezza Performance Server on Cloud Pak for Data System_ (Netezza on prem) the next step in the modernization of Netezza is the ability to couple Netezza's in database analytics with Python's data science and visualization capabilities. 

With the release of [nzpy](https://pypi.org/project/nzpy/) you can connect to and work with Netezza on Cloud and Netezza on prem from any OS that support python. It even unlocks the ability to stream data directly to and from the Netezza system without any other software or drivers.

Lets look at two parts 

- First part will cover programming basics with nzpy
- The second will put everything together and actually go through a simple data science use case


## Nzpy programming basics

`nzpy` is a pure python driver implementation of the [Python DBAPI 2.0](https://www.python.org/dev/peps/pep-0249/DBApi) and so apart from a `pip install` there is no other pre-requisite.

## Install pre-requisites

In [None]:
!pip install nzpy

### Connecting to the database 

The connection only requires the hostname of the Netezza 

In [None]:
# Setup connection and use the credentials from the connection. Replace the following values before you start

# from project_lib import Project
# project = Project.access()
# NPS_credentials = project.get_connection(name="NPS")

## OR

username="<username>"
password="<password>"
host="<hostname or ip>"
database="system"


In [None]:
import nzpy
import os

db = 'NZPY_TEST'
con = nzpy.connect(user=username, password=password, host=host,
                   database=database, port=5480)
selectQuery = f"select 1 from _v_database where database = '{db}'"
createQuery = f"create database {db}"
## Make sure the database exists, if not create one
with con.cursor() as cur:
    cur.execute(selectQuery)
    r = cur.fetchone();
    if r is None:
        cur.execute(createQuery)
        
# Now connect using the new database.        
con = nzpy.connect(user=username, password=password, host=host,
                   database=db, port=5480)
cursor=con.cursor()

In [None]:
# fetch teh databases to make sure the database is created

cursor.execute("select * from _v_database limit 10")
row = cursor.fetchone()
while row: 
    print(row)
    row = cursor.fetchone()


The `con` is a Python DBAPI [Connection](https://www.python.org/dev/peps/pep-0249/#connection-objects) that lets one interact with the Netezza database using its [Cursor](https://www.python.org/dev/peps/pep-0249/#cursor-objects) 

### Working with Netezza database 

The `con` and `con.cursor()` objects can be used to interact with the Netezza database. [pandas.Dataframe](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) can be used for quick result visualization in addition to its main purpose as the foundation of data science.  For example

In [None]:
import pandas as pd
result = pd.read_sql_query("select database, owner, createdate from _v_database where owner like 'ADMIN'", con)
# make result column names human friendly
result.columns = [c.decode().lower() for c in result.columns]
result

`nzpy` also supports streaming data loads and unloads. There are quite a few variations we can do here. 

#### Load or unload regular flat files 

Before that lets setup a database so that we can create tables programmatically. From the NPS console, create a table called `nzpytest`.

In [None]:
cursor.execute("drop table nzpy_test..ORDERS if exists")

cursor.execute('''
CREATE TABLE ORDERS  ( O_ORDERKEY       INTEGER NOT NULL PRIMARY KEY,
                           O_CUSTKEY        INTEGER NOT NULL,
                           O_ORDERSTATUS    CHAR(1) NOT NULL,
                           O_TOTALPRICE     DECIMAL(15,2) NOT NULL,
                           O_ORDERDATE      DATE NOT NULL,
                           O_ORDERPRIORITY  CHAR(15) NOT NULL,
                           O_CLERK          CHAR(15) NOT NULL,
                           O_SHIPPRIORITY   INTEGER NOT NULL,
                           O_COMMENT        VARCHAR(79) NOT NULL)
                           ORGANIZE ON NONE;
''')
print("Table ORDERS created")        

Loading data from local files to NPS uses streaming transient external table with `REMOTESOURCE 'python'` option as shown below:

In [None]:
# Make sure to change the path where your sales.csv is located. 
#Relative path is in the repository you cloned.
with con.cursor() as cursor:
    cursor.execute('''
        insert into nzpy_test..ORDERS
            select * from external '/project_data/data_asset/orders.tbl'
                using (
                    delim '|' 
                    remotesource 'odbc'
                    skiprows 1)''')
    print(f"{cursor.rowcount} rows inserted")
    # cursor.rowcount will report the number of rows loaded

The same works in reverse to load local files 

In [None]:
with con.cursor() as cursor:
    cursor.execute('''
      create external table '/tmp/orders.tbl'
        using (
            delim '|' 
            remotesource 'odbc'
            includeheader yes
        ) as select * from nzpy_test..orders limit 20''')    
pd.read_csv('/tmp/orders.csv', delimiter='|')

#### Loading from other data sources

Data sources, like external servers, github etc can be fit into this model by streaming data from the source through the python application itself. The python using `nzpy` can read data from external sources and connect that directly to `nzpy` pipe via a named pipe. 

_(Note: The named pipe method below works seamlessly on Linux and Mac. For windows win32pipe module can be used to achieve the same)_

Creating a pipe for all data streaming loads lets one have a thread that can push data to this pipe. The other end is connected to Netezza via an external table.


In [None]:
import pathlib
datapipe = pathlib.Path("/tmp/datapipe")

def create_datapipe():
    global datapipe
    if datapipe.exists():
        print(f"Cleaning prior {datapipe}")
        datapipe.unlink()
    
    print(f"Initializing fifo", end='..')
    os.mkfifo(datapipe)
    print('Done')

The `requests` module coupled with `shutil.copyobject` can be used to actually stream rather than spool data. Here's an example of streaming data that is obtained as gzip'd from github

In [None]:
import requests, shutil, subprocess, gzip

def load_published_dataset(ds):
    with open(datapipe, "wb") as pipe:
        with requests.get(ds, stream=True) as r:
            with gzip.GzipFile(fileobj=r.raw) as unzip:
                shutil.copyfileobj(unzip, pipe)


Connecting the two together, and creating an external table over the named pipe will like this - 

_(Note: the pipe is put in a separate thread so that nzpy doesn't block the data stream)_

The `Cursor.rowcount` attribute will report the number of rows loaded

In [None]:
cursor=con.cursor()
cursor.execute('drop table nzpy_test..CARS if exists')
cursor.execute('''
                    CREATE TABLE nzpy_test..CARS  ( MPG       NUMERIC(5,2),
                           CYLINDERS        INTEGER,
                           ENGINE    NUMERIC(5,2),
                           HORSEPOWER     INTEGER,
                           WEIGHT      INTEGER,
                           ACCELERATION  NUMERIC(5,2),
                           YEAR          INTEGER,
                           ORIGIN   VARCHAR(10),
                           NAME        VARCHAR(40))
                           ORGANIZE ON NONE;
                '''
              )

In [None]:
import threading

create_datapipe()
if datapipe.exists() and datapipe.is_fifo():
    print("Pipe ready")

source = 'https://raw.githubusercontent.com/ibm-watson-data-lab/open-data/master/cars/cars.csv'
streamer = threading.Thread(target=load_published_dataset, args=(source,))
streamer.start()

with con.cursor() as cursor:                            
    cursor.execute(f''' 
        insert into nzpy_test..cars select * from external '{datapipe}' 
            using (
                delim ','
                remotesource 'odbc'
                skiprows 1
            )''')
    print(f"{cursor.rowcount} rows inserted")
    
streamer.join()



Doing it in revserse and specifying a file instead of named pipe will unload the data. `pandas.Dataframe` can be used to directly read the output of a query and then do further analytics on it. 

### Data ingestion and ELT from object store

NPS can load and unload data from object stores like Amazon S3 and IBM Cloud object store. This works by using Netezza External Tables to read from and write to object store.

Lets take a look at a few examples; lets target an AWS S3 bucket. Prerequisites -

- AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables are set with the correct credentials
- BUCKET environment variable has the correct bucket name

_Note:_ One can configure the ACLs for the bucket on AWS like [this](https://www.ibm.com/support/knowledgecenter/en/SSTNZ3/com.ibm.ips.doc/postgresql/admin/adm_nps_cloud_provisioning_prereq_aws.html) to balance security and need for NPS to read/write to it.

In [None]:
import os
s = 'AWS_ACCESS_KEY_ID=<AWS ACCESS KEY ID> AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY> AWS_REGION=<AWS_REGION> BUCKET=<BUCKET_NAME>'
os.environ.update(dict(i.split('=') for i in s.split()))
import pandas as pd

In [None]:
# Setup the table schema based on the json mentioned above
schema = '''
    iso_code varchar (16),
    continent varchar(48),
    location varchar(48),
    covid_date date,
    total_cases numeric(32, 20),
    new_cases numeric(32, 20),
    new_cases_smoothed numeric(32, 20),
    total_deaths numeric(32, 20),
    new_deaths numeric(32, 20),
    new_deaths_smoothed numeric(32, 20),
    total_cases_per_million numeric(32, 20),
    new_cases_per_million numeric(32, 20),
    new_cases_smoothed_per_million numeric(32, 20),
    total_deaths_per_million numeric(32, 20),
    new_deaths_per_million numeric(32, 20),
    new_deaths_smoothed_per_million numeric(32, 20),
    new_tests numeric(32, 20),
    total_tests numeric(32, 20),
    total_tests_per_thousand numeric(32, 20),
    new_tests_per_thousand numeric(32, 20),
    new_tests_smoothed numeric(32, 20),
    new_tests_smoothed_per_thousand numeric(32, 20),
    tests_per_case numeric(32, 20),
    positive_rate numeric(32, 20),
    tests_units varchar(32),
    stringency_index numeric(32, 20),
    population numeric(32, 20),
    population_density numeric(32, 20),
    median_age numeric(32, 20),
    aged_65_older numeric(32, 20),
    aged_70_older numeric(32, 20),
    gdp_per_capita numeric(32, 20),
    extreme_poverty numeric(32, 20),
    cardiovasc_death_rate numeric(32, 20),
    diabetes_prevalence numeric(32, 20),
    female_smokers numeric(32, 20),
    male_smokers numeric(32, 20),
    handwashing_facilities numeric(32, 20),
    hospital_beds_per_thousand numeric(32, 20),
    life_expectancy numeric(32, 20),
    human_development_index numeric(32, 20)'''

# Read data on the fly and lets see if all is working well. 
df = pd.read_sql(f'''
    select unique(continent)
    from external 'owid-covid-data.csv' ({schema})
    using (
        remotesource 'S3' 
        delim ','
        uniqueid 'covid' 
        accesskeyid '{os.environ["AWS_ACCESS_KEY_ID"]}'
        secretaccesskey '{os.environ["AWS_SECRET_ACCESS_KEY"]}'
        defaultregion '{os.environ["AWS_REGION"]}'
        bucketurl '{os.environ["BUCKET"]}'
        skiprows 1
    ) where continent is not null and continent != '' ''', con)
df.columns = [c.decode().lower() for c in df.columns]
df

### Load data from S3 to Netezza

In [None]:
cursor=con.cursor()
table = 'covid'
with con.cursor() as cur:
    # drop any old table
    r = cur.execute(f"select 1 from _v_table where tablename = '{table}'")
    if r.fetchall():
        cur.execute(f'drop table {table}')
        
    # create a table to load data
    cur.execute(f'create table {table} ({schema})')
    print(f"Table {table} created")
    
    # load data from object store
    cur.execute(f'''
    insert into {table} 
        select * from external 'owid-covid-data.csv' ({schema})
        using (
            remotesource 'S3' 
            delim ','
            uniqueid 'covid' 
            accesskeyid '{os.environ["AWS_ACCESS_KEY_ID"]}'
            secretaccesskey '{os.environ["AWS_SECRET_ACCESS_KEY"]}'
            defaultregion '{os.environ["AWS_REGION"]}'
            bucketurl '{os.environ["BUCKET"]}'
            skiprows 1
        )''')
    print(f"{cur.rowcount} Rows loaded")

In [None]:
# Get a week over week trend
df = pd.read_sql_query('''
    select continent,
            this_week(covid_date) as wk,
            max(new_cases) as total 
    from covid 
    where 
        continent is not null and
        continent != ''
    group by wk, continent
    order by wk, continent
    ''', con,
    parse_dates = {b'WK': '%Y-%m-%d'})
df.columns = [c.decode().lower() for c in df.columns]
df.total = df.total.astype(float)
df.head()

In [None]:
# Lets visualize the same
from mizani.formatters import date_format
from plotnine import *

( ggplot(df, aes(x='wk', y='total', color='continent')) + geom_line() + geom_point() + 
  labs(y = "Total cases", x = "Week") + facet_wrap('continent') + 
   scale_x_datetime(labels=date_format('%b %-d')) +
   theme(axis_text_x=element_text(rotation=60, hjust=1))
)

## A real life example

In this example lets use Python and Netezza Performance Server, to load and analyze the data on [Australian temperatures and rainfall published publically](https://github.com/rfordatascience/tidytuesday/blob/master/data/2020/2020-01-07/readme.md) 

The best practices are 

- Load data into Netezza 
- Do as most of the filtering, transformation and analytics in database
- Do the last step of visualizing and final analytics by extracting the smaller result of the above step in Python

### Step 1 - Dataset

Lets look at the dataset. For the first go around, setup the tables. The data here represents the [Australian weather station temperature and rainfall data as of Jan 1 2020](https://github.com/rfordatascience/tidytuesday/blob/master/data/2020/2020-01-07/readme.md) 

There are two tables 

**temperature.csv**

|column    |type     |description |
|:-----------|:---------|:-----------|
|city_name   |VARCHAR(20) | City Name|
|date        |DATE    | Date |
|temperature |NUMERIC(8,3)    | Temperature in Celsius |
|temp_type   |VARCHAR(10) | Temperature type (min/max daily) |
|site_name   |VARCHAR(30) | Actual site/weather station|

**rainfall.csv**

|column     |type     |description |
|:------------|:---------|:-----------|
|station_code |INT | Station Code |
|city_name    |VARCHAR(20) | City Name |
|year         |INT    | Year |
|month        |INT  | Month |
|day          |INT | Day |
|rainfall     |NUMERIC(8,3)    | rainfall in millimeters|
|period       |INT    | how many days was it collected across |
|quality      |VARCHAR(5) | Certified quality or not |
|lat          |NUMERIC(5,2)    | latitude |
|long         |NUMERIC(5,2)    | longitude |
|station_name |VARCHAR(30) | Station Name |


### Step 2 - Setup the tables

In [None]:
with con.cursor() as cursor:
    # find which of the tables already exist
    existing = { table[0] for table in cursor.execute("select lower(tablename) from _v_table where lower(tablename) in ('rainfall', 'temperature')").fetchall() }
    if 'temperature' not in existing:
        # create only if they don't
        cursor.execute('''
        create table nzpy_test..temperature (
            city_name     VARCHAR(20),
            date          DATE,
            temperature   NUMERIC(8,3),
            temp_type     VARCHAR(10),
            site_name     VARCHAR(30)
        ) distribute on (city_name)
        ''')
        print("Table temperature created")
        
    if 'rainfall' not in existing:
        cursor.execute('''
        create table nzpy_test..rainfall (
            station_code  INT,
            city_name     VARCHAR(20),
            year          INT,
            month         INT,
            day           INT,
            rainfall      NUMERIC(8,3),
            period        INT,
            quality       VARCHAR(5),
            lat           NUMERIC(5,2),
            long          NUMERIC(5,2),
            station_name  VARCHAR(100)
        ) distribute on (city_name, station_code)
        ''')
print("Table rainfall created")

Now stream and load both tables 

In [None]:
create_datapipe()

rainfall = "https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2020/2020-01-07/rainfall.csv"
loader = threading.Thread(target=load_published_dataset, args=(rainfall,))
loader.start()

with con.cursor() as cursor:
    print("Loading data", end=".. ")
    result = cursor.execute(f'''INSERT INTO nzpy_test..rainfall SELECT * FROM EXTERNAL '{datapipe}'
                                USING (
                                    DELIMITER ','
                                    REMOTESOURCE 'ODBC'
                                    NULLVALUE 'NA'
                                    SKIPROWS 1)''')
    print(f"{cursor.rowcount} rows inserted")
    
loader.join()

In [None]:
create_datapipe()
temperature = "https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2020/2020-01-07/temperature.csv"

loader = threading.Thread(target=load_published_dataset, args=(temperature,))
loader.start()

with con.cursor() as cursor:
    print("Loading data", end=".. ")
    result = cursor.execute(f'''INSERT INTO nzpy_test..temperature SELECT * FROM EXTERNAL '{datapipe}'
                                USING (
                                    DELIMITER ','
                                    REMOTESOURCE 'ODBC'
                                    NULLVALUE 'NA'
                                    SKIPROWS 1)''')
    print(f"{cursor.rowcount} rows inserted")
    
loader.join()

### Step - 3 Analyze the data

First analyze the data in database by grouping the temperatures across months and decades to reduce the dataset. After that visualization and further analytics can be done easily in python

In [None]:
df = pd.read_sql('''
    select extract(decade from date) * 10 as decade,
           extract(month from date) as month,
           city_name,
           avg(temperature) as avg
        from nzpy_test..temperature
        where city_name in ('SYDNEY', 'MELBOURNE')
        group by month, decade, city_name
        order by month
 ''', con)

df.columns = [c.decode().lower() for c in df.columns]
df.decade = df.decade.astype(int)
df.avg = df.avg.astype(float)
df.month = df.month.astype(int)
df

Now we can combine `DataFrame` with `ggplot` too see how average temperatures for two cities across the year stack up across all decades for the last 100 years

In [None]:
import matplotlib, calendar
from plotnine import *
%matplotlib inline

( ggplot(df, aes(x='month', y='avg', group='decade', color='decade')) + geom_line() + geom_point() + 
  labs(y = "Average Temperature (°C)", x = "Month") + facet_wrap('city_name') +
   scale_color_gradient(low="blue", high="red") + 
   scale_x_discrete(labels=list(calendar.month_abbr[1:]), limits=range(12)) + 
   theme(axis_text_x=element_text(rotation=60, hjust=1))
)

## Next Steps

So far we have interacted with Netezza with DDL and DML sql scripts from varous sources such as CSV files, files stores in github, object store from S3.

In the next notebook we will be learning more about the analytical and machine learning capabilities of Netezza Performance Server.