# Data transformation, cleaning and loading with Python

## WaterInfo Data Set
We are considering a water data set this week about Murray River Basin in NSW. We have made available the content of the Excel workbook as a set of four different CSV files. Please upload those CSV files to Jupyter first. 

**Important:** Make sure that the naming of all the files is as follows:
 1. Measurements.csv
 2. Organisations.csv
 3. Sensors.csv
 4. Stations.csv

# EXERCISE 1: Data Loading and Database Creation with Python

We continue with the same Python environment: the `DictReader` from the `csv` module which support reading and writing of files in comma-separated values (CSV).

Make sure that you have uploaded the 'Organisations.csv' CSV file into Jupyter.
We will first load the content of this file into Python with the same  csv.DictReader()  mechanism:

In [15]:
import csv
import pprint
data_organisations = list(csv.DictReader(open('Organisations.csv')))
pprint.pprint(data_organisations[0])

{'Code': 'DNR',
 'Organisation': 'NSW Department of Water and Energy (and predecessors)'}


For larger data sets, the following would normally be executed as a stand alone Python program on a shell.
First, you need to establish a connection to the postgresql database. 
__Please edit the YOUR_DBNAME, YOUR_USERNAME and YOUR_PW variables in the Credentials.json file to match your database login. (Refer to the sample josn file provided)__

In [16]:
import psycopg2
import json

f = open('Credentials.json')
db_conn_dict = json.load(f)
YOUR_DBNAME =  db_conn_dict['database']
YOUR_USERNAME = db_conn_dict['user']
YOUR_PW = db_conn_dict['password']

def pgconnect():
  
    try: 
        conn = psycopg2.connect(host='localhost',
                                database=YOUR_DBNAME,
                                user=YOUR_USERNAME, 
                                password=YOUR_PW)
        print('connected')
    except Exception as e:
        print("unable to connect to the database")
        print(e)
    return conn

We will need to execute some SQL statements against the database. As we will have to do so multiple times, we write a dedicated function for executing an arbitrary SQL statement, where we do not expect any result. This handles then also all failures and using psycopg2's 'with' statements also the transaction processing of the database. Below's code will for example automatically commit our SQL statements, as well as rollback if there was any error.

In [17]:
def pgexec( conn, sqlcmd, args, msg, silent=False ):
   """ utility function to execute some SQL statement
       can take optional arguments to fill in (dictionary)
       error and transaction handling built-in """
   retval = False
   with conn:
      with conn.cursor() as cur:
         try:
            if args is None:
               cur.execute(sqlcmd)
            else:
               cur.execute(sqlcmd, args)
            if silent == False: 
                print("success: " + msg)
            retval = True
         except Exception as e:
            if silent == False: 
                print("db error: ")
                print(e)
   return retval

Now let's load our previous data.
Important: whenever you use this approach, make sure that the header line of your CSV file has no spaces in its column titles and also no quotes. Otherwise, the csv.DictReader might be fine to read it, but not the psycopg2's cursor.execute() function. We are using named placeholders in out INSERT statement below (eg. '%(SiteName)s' ) which expects to put a string (%s) into that place of the INSERT statement as been found in the given dictionary for the execute() call with the key 'SiteName'.

In [18]:
# 1st: login to database
conn = pgconnect()

# if you want to reset the table
pgexec (conn, "DROP TABLE IF EXISTS Organisation", None, "Reset Table Organisation")

# 2nd: ensure that the schema is in place

organisation_schema = """CREATE TABLE IF NOT EXISTS Organisation (
                         code VARCHAR(20) PRIMARY KEY,
                         orgName VARCHAR(150)
                   )"""
pgexec (conn, organisation_schema, None, "Create Table Organisation")

# 3nd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Organisation(code,orgName) VALUES (%(Code)s, %(Organisation)s)"""
for row in data_organisations:
    pgexec (conn, insert_stmt, row, "row inserted")



connected
db error: 
cannot drop table organisation because other objects depend on it
DETAIL:  constraint orgcodefk on table station depends on table organisation
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

success: Create Table Organisation
db error: 
column "orgname" of relation "organisation" does not exist
LINE 1: INSERT INTO Organisation(code,orgName) VALUES ('DNR', 'NSW D...
                                      ^

db error: 
column "orgname" of relation "organisation" does not exist
LINE 1: INSERT INTO Organisation(code,orgName) VALUES ('DWR', 'NSW D...
                                      ^

db error: 
column "orgname" of relation "organisation" does not exist
LINE 1: INSERT INTO Organisation(code,orgName) VALUES ('MIL', 'Murra...
                                      ^

db error: 
column "orgname" of relation "organisation" does not exist
LINE 1: INSERT INTO Organisation(code,orgName) VALUES ('PWD', 'Manly...
                                      ^

db er

Next let's check whether this has all worked fine by querying our PostgreSQL database. To do so, we introduce first another utility function which again encapsulates all error and transaction handling. Then we query the new Organisation table and simply print out all tuples found.

In [19]:
def pgquery( conn, sqlcmd, args, silent=False ):
   """ utility function to execute some SQL query statement
       can take optional arguments to fill in (dictionary)
       will print out on screen the result set of the query
       error and transaction handling built-in """
   retval = False
   with conn:
      with conn.cursor() as cur:
         try:
            if args is None:
                cur.execute(sqlcmd)
            else:
                cur.execute(sqlcmd, args)
            if silent == False:
                for record in cur:
                    print(record)
            retval = True
         except Exception as e:
            if silent == False:
                print("db read error: ")
                print(e)
   return retval

In [20]:

# check content of Organisation table
query_stmt = "SELECT * FROM Organisation"
print(query_stmt)
pgquery (conn, query_stmt, None)

# cleanup...   Needed already?  Better not now... 
# But keep in mind to close connection eventually!
# conn.close()

SELECT * FROM Organisation
('DNR', 'NSW Department of Water and Energy (and predecessors)')
('DWR', 'NSW Department of Water and Energy (and predecessors)')
('MIL', 'Murray Irrigation Ltd')
('PWD', 'Manly Hydraulics Laboratory')
('QWR', 'Qld Department of Natural Resources and Water')
('SCA', 'Sydney Catchment Authority')
('SMA', 'Snowy Mountains Authority')
('SWB', 'Sydney Catchment Authority')
('VRW', 'Vic Government')


True

## Your Task: Data Loading

Try to create and load the Measurement table.

    1.read the Measurements csv file
    2. Create a matching 'Measurement' table to hold the CSV data
    3. Load the content of the csv file into a local 'data_measurements' dictionary in Python
    4. Load the data from the 'data_measurements' dictionary into your PostgreSQL table
    5. Query and print its content

In [21]:
# TODO: replace the content of this cell with your Python + psycopg2 solution
raise NotImplementedError



NotImplementedError: 

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 2: Data Cleaning

### Data Cleaning
We re-use the clean() function.

In [23]:
import numpy as np
DEFAULT_VALUE = np.nan

def clean(data, column_key, convert_function, default_value):
    special_values= {} # no special values yet
    for row in data:
        old_value = row[column_key]
        new_value = default_value
        try:
            if old_value in special_values.keys():
                new_value = special_values[old_value]
            else:
                new_value = convert_function(old_value)
        except (ValueError, TypeError):
            print('Replacing {} with {} in column {}'.format(row[column_key], new_value, column_key))
        row[column_key] = new_value
             

In [24]:
# this conversion strips any leading or trailing spaces from the 'Station' values
clean(data_measurements, 'Station', str.strip, DEFAULT_VALUE)

# the following converts the two measurment  columns to float  values - or NaN
clean(data_measurements, 'Discharge', float, DEFAULT_VALUE)
clean(data_measurements, 'MeanDischarge', float, DEFAULT_VALUE)
clean(data_measurements, 'Level', float, DEFAULT_VALUE)
clean(data_measurements, 'Temp', float, DEFAULT_VALUE)
clean(data_measurements, 'EC', float, DEFAULT_VALUE)

##now we insert the  data_measurements into the 'Measurement' table 
# 1st: login to database
if(conn):
    conn.close();
conn = pgconnect()

# 2nd: ensure that the schema is in place
pgexec (conn, "DROP TABLE IF EXISTS Measurement", None, "Reset Table Measurement")
measurement_schema = """CREATE TABLE IF NOT EXISTS Measurement (
                         station   VARCHAR(20),
                         date DATE,
                         level  FLOAT,
                         meanDischarge FLOAT,
                         discharge  FLOAT,
                         temp FLOAT,
                         ec  FLOAT
                      )"""
pgexec (conn, measurement_schema, None, "Create Table Measurement")

# 3nd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Measurement(station,date,level,meandischarge,discharge,temp,ec)
                      VALUES (%(Station)s, %(Date)s, %(Level)s,%(MeanDischarge)s,%(Discharge)s,%(Temp)s,%(EC)s)"""
for row in data_measurements:
    pgexec (conn, insert_stmt, row, "row inserted")
    
query_stmt = "SELECT COUNT(*) FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)

NameError: name 'data_measurements' is not defined

## Your Task: Data Cleaning

Use above's  clean()  function to clean the other give data set too.
 1. read the Stations csv file into data_stations
 2. Clean the  'data_stations'  data set
 3. Reload the 'data_stations'  dictionary into your database
 4. Query the 'Stations' table - which difference do you see?
 
 5. If you have time: Do all of the above (reading - cleaning - loading) also for the 'Sensors.csv' data set

Note: You might encounter a few warning and error messages.
   - If a connection is closed, you have to open the databse connection again first
   - If the clean() function returns a warning that some string was replaced with NaN, as long as this is indeed a number attribute, you are Ok to ignore this message. It just tells you that it is doing what it is supposed to do.
   - If you try to insert data into an already existing table with data inside, you might get 'duplicate primary key' error messages. Again, you can ignore those for the moment.
   - If you want to see who much data is already in your table, use the following SQL query:
     -  SELECT COUNT(*) FROM Stations;

In [None]:
# TODO: replace the content of this cell with your Python solution
raise NotImplementedError

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 3: Database Creation
After we discussed the model, we will give you an example solution (see below).
The next step is to create the corresponding SQL schema in your PostgreSQL database.

### Your Task: DB Creation in PostgreSQL
Create the corresponding tables in PostgreSQL which follow from the data model.

<pre>
DROP TABLE IF EXISTS Organisation CASCADE;
CREATE TABLE IF NOT EXISTS Organisation (
   code VARCHAR(20) PRIMARY KEY,
   organisation    VARCHAR(150)
);

DROP TABLE IF EXISTS Station CASCADE;
CREATE TABLE IF NOT EXISTS Station (
     station   VARCHAR(50) PRIMARY KEY,
     siteName  VARCHAR(50),
     commence Date,
     orgCode  VARCHAR(50),
     CONSTRAINT orgCodeFK FOREIGN KEY (orgCode)   REFERENCES Organisation (code)
 );

DROP TABLE IF EXISTS Sensor CASCADE;
CREATE TABLE IF NOT EXISTS Sensor (
     sensor   VARCHAR(20) PRIMARY KEY,
     description  VARCHAR(150) ,       
     metric VARCHAR(20)                 
  );

DROP TABLE IF EXISTS Measurement CASCADE;
CREATE TABLE IF NOT EXISTS Measurement (
     station   VARCHAR(20),
     sensor   VARCHAR(20),
     date DATE,
     value  FLOAT,
     CONSTRAINT stationFK FOREIGN KEY (station)   REFERENCES Station (Station),
     CONSTRAINT sensorFK FOREIGN KEY (sensor)   REFERENCES sensor (sensor)
  );
                      
</pre>

In [None]:
# TODO: replace the content of this cell with your Python + SQL solution
raise NotImplementedError
# make sure we are still connected to database 


# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 4: Data Loading and Storage

Up-to this point, we have
 - analysed and modelled the given data set
 - created a corresponding relational star schema
 - read the individual CSV files into Python dictionary data structures
 - cleaned the raw data with regard to missing or inconsistent entries and data types
 
The final step is to load this cleaned data into the corresponding tables of the star schema which we defined so far.

For this to work, you probably will need to write some logic to load different parts of different data dictionaries (holding the content of CSV files) into different tables.


In [None]:
import numpy as np
import csv
import pprint
# Organisation table
data_organisations = list(csv.DictReader(open('Organisations.csv')))

# make sure we are still connected to database 
if conn is None or conn.closed:
    conn = pgconnect()

# check for any existing content of the Organisations table
query_stmt = "SELECT COUNT(*) FROM Organisation"
print(query_stmt)
pgquery (conn, query_stmt, None)

# Try to load data - 9 tupels should be created
insert_stmt = """INSERT INTO Organisation(code,organisation) VALUES (%(Code)s, %(Organisation)s)"""
for row in data_organisations:
    pgexec (conn, insert_stmt, row, "row inserted")
    
#####################
# Station table    
data_stations = list(csv.DictReader(open('Stations.csv')))

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Station(station,siteName,commence,orgCode)
                      VALUES (%(station)s, %(siteName)s, %(commence)s,%(orgCode)s)"""
stationData = dict()
for row in data_stations:
    stationData['station'] = row['BasinNo']+row['Site']
    stationData['siteName']  = row['SiteName']
    stationData['commence'] = row['Commence']
    stationData['orgCode'] = row['OrgCode']
    pgexec (conn, insert_stmt, stationData, "station inserted")

# check content of Station table
query_stmt = "SELECT * FROM Station"
print(query_stmt)
pgquery (conn, query_stmt, None)



#####################
# Sensor table    
data_sensors = list(csv.DictReader(open('Sensors.csv')))

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Sensor(sensor,description,metric)
                      VALUES (%(sensor)s, %(description)s, %(metric)s)"""

for row in data_sensors:
    pgexec (conn, insert_stmt, row, "sensor inserted")

# check content of Station table
query_stmt = "SELECT * FROM Sensor"
print(query_stmt)
pgquery (conn, query_stmt, None)


#####################
# Measurement table    
data_measurements = list(csv.DictReader(open('Measurements.csv')))

# the following converts the two measurment  columns to float  values - or NaN
clean(data_measurements, 'Discharge', float, DEFAULT_VALUE)
clean(data_measurements, 'MeanDischarge', float, DEFAULT_VALUE)
clean(data_measurements, 'Level', float, DEFAULT_VALUE)
clean(data_measurements, 'Temp', float, DEFAULT_VALUE)
clean(data_measurements, 'EC', float, DEFAULT_VALUE)

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Measurement(station,date,sensor,value)
                      VALUES (%(station)s, %(date)s, %(sensor)s, %(value)s)"""

measurementData = dict()
sensorCodes = ['levl', 'disvol','disc','temp', 'ec']
sensor_columns = ['Level', 'MeanDischarge', 'Discharge','Temp', 'EC']
for row in data_measurements:
    measurementData['station'] = row['Station']
    measurementData['date'] = row['Date']
    for i in range(len(sensorCodes)):
        if(np.isnan(row[sensor_columns[i]])):
            continue;
        measurementData['sensor'] = sensorCodes[i]
        measurementData['value']  = row[sensor_columns[i]]
        pgexec (conn, insert_stmt, measurementData, "measurement inserted")
    

# check content of Measurement table
query_stmt = "SELECT * FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)

query_stmt = "SELECT count(*) FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)
conn.close();

## Your Task: Data Storage in PostgreSQL
Following the above pattern, make sure all the   tables of our water schema are loaded with the data from the different CSV files.

In [None]:
# TODO: replace the content of this cell with your Python + SQL solution
raise NotImplementedError

# End of Exercise. Many Thanks.