# Week 4: Data transformation, cleaning and loading with Python

The following tutorial assumes a bit of background on SQL, in particular on its core commands 
to create new tables and to retrieve data:

 SQL Command   |  Meaning
 --------------|------------
 SELECT COUNT(\*) FROM *T*   | count how many tuples are stored in table *T*
 SELECT \* FROM *T*          | list the content of table *T*

You can learn more background on these SQL commands in the [Ed lessons on SQL][1] (Module 16 onwards).

  [1]: https://edstem.org/au/courses/14365/lessons/

## Water Dataset
We are considering a water dataset this week about Murray River Basin in NSW. On Canvas (Module 4 section), there are four different CSV files. Please put them under the same folder as this notebook first. 

**Important:** Make sure that the naming of all the files is as follows:
 1. Measurements.csv
 2. Organisations.csv
 3. Sensors.csv
 4. Stations.csv

# EXERCISE 1: Data Loading and Database Creation with Python

We will use the `DictReader` from the `csv` module which support reading and writing of files in comma-separated values (CSV).

We will first load the content of the 'Organisations.csv' CSV file into Python with the csv.DictReader()  mechanism:

In [1]:
import csv
data_organisations = list(csv.DictReader(open('Organisations.csv')))
print(data_organisations[0])

{'Code': 'DNR', 'Organisation': 'NSW Department of Water and Energy (and predecessors)'}


For larger data sets, the following would normally be executed as a stand alone Python program on a shell.
First, you need to establish a connection to the postgresql database. 
__Please edit the YOUR_DBNAME, YOUR_USERNAME and YOUR_PW variables in below's code to match your database login.__

In [None]:
import psycopg2

def pgconnect():
    # please replace with your own details
    YOUR_DBNAME = 'postgres'    # 直接看database 名字就好
    YOUR_USERNAME = 'postgres'  # 要在server的setting里面看
    YOUR_PW     = 'qq56523734'
    try: 
        conn = psycopg2.connect(host='localhost',
                                database=YOUR_DBNAME,
                                user=YOUR_USERNAME, 
                                password=YOUR_PW)
        print('connected')
    except Exception as e:
        print("unable to connect to the database")
        print(e)
    return conn

We will need to execute some SQL statements against the database. As we will have to do so multiple times, we write a dedicated function for executing an arbitrary SQL statement, where we do not expect any result. This handles all failures and using psycopg2's 'with' statements also handles the transaction processing of the database. Below's code will for example automatically commit our SQL statements, as well as rollback if there was any error.

In [3]:
def pgexec( conn, sqlcmd, args, msg, silent=False ):
   """ utility function to execute some SQL statement
       can take optional arguments to fill in (dictionary)
       error and transaction handling built-in """
   retval = False
   with conn:
      with conn.cursor() as cur:
         try:
            if args is None:
               cur.execute(sqlcmd)
            else:
               cur.execute(sqlcmd, args)
            if silent == False: 
                print("success: " + msg)
            retval = True
         except Exception as e:
            if silent == False: 
                print("db error: ")
                print(e)
   return retval

Now let's load our previous data.
Important: whenever you use this approach, make sure that the header line of your CSV file has no spaces in its column titles and also no quotes. Otherwise, the csv.DictReader might be fine to read it, but not the psycopg2's cursor.execute() function. We are using named placeholders in our INSERT statement below (eg. '%(Code)s' ) which expects to put a string (%s) into that place of the INSERT statement as been found in the given dictionary for the execute() call with the key 'Code'.

In [4]:
# 1st: login to database
conn = pgconnect()

# if you want to reset the table
pgexec (conn, "DROP TABLE IF EXISTS Organisation CASCADE", None, "Reset Table Organisation")

# 2nd: ensure that the schema is in place
organisation_schema = """CREATE TABLE IF NOT EXISTS Organisation (
                         code VARCHAR(20) PRIMARY KEY,
                         orgName VARCHAR(150)
                   )"""
pgexec (conn, organisation_schema, None, "Create Table Organisation")

# 3rd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Organisation(code,orgName) VALUES (%(Code)s, %(Organisation)s)"""
for i, row in enumerate(data_organisations):
    pgexec (conn, insert_stmt, row, f"Row {i+1} inserted")

# 4th: keep in mind to close connection
conn.close()

connected
success: Reset Table Organisation
success: Create Table Organisation
success: Row 1 inserted
success: Row 2 inserted
success: Row 3 inserted
success: Row 4 inserted
success: Row 5 inserted
success: Row 6 inserted
success: Row 7 inserted
success: Row 8 inserted
success: Row 9 inserted


Next let's check whether this has all worked fine by querying our PostgreSQL database. To do so, we first introduce another utility function which again encapsulates all error and transaction handling. Then we query the new Organisation table and simply print out all tuples found.

In [5]:
def pgquery( conn, sqlcmd, args, silent=False ):
   """ utility function to execute some SQL query statement
       can take optional arguments to fill in (dictionary)
       will print out on screen the result set of the query
       error and transaction handling built-in """
   retval = False
   with conn:
      with conn.cursor() as cur:
         try:
            if args is None:
                cur.execute(sqlcmd)
            else:
                cur.execute(sqlcmd, args)
            if silent == False:
                for record in cur:
                    print(record)
            retval = True
         except Exception as e:
            if silent == False:
                print("db read error: ")
                print(e)
   return retval

In [6]:
# check content of Organisation table
conn = pgconnect()
query_stmt = "SELECT * FROM Organisation"
print(query_stmt)
pgquery (conn, query_stmt, None)
conn.close()

connected
SELECT * FROM Organisation
('DNR', 'NSW Department of Water and Energy (and predecessors)')
('DWR', 'NSW Department of Water and Energy (and predecessors)')
('MIL', 'Murray Irrigation Ltd')
('PWD', 'Manly Hydraulics Laboratory')
('QWR', 'Qld Department of Natural Resources and Water')
('SCA', 'Sydney Catchment Authority')
('SMA', 'Snowy Mountains Authority')
('SWB', 'Sydney Catchment Authority')
('VRW', 'Vic Government')


## Your Task: Data Loading

Try to create and load the Measurements table.

    1. Read the Measurements csv file
    2. Create a matching 'Measurement' table to hold the CSV data
    3. Load the content of the csv file into a local 'data_measurements' dictionary in Python
    4. Load the data from the 'data_measurements' dictionary into your PostgreSQL table
    5. Query and print its content

In [7]:
# TODO: replace the content of this cell with your Python + psycopg2 solution
# raise NotImplementedError

data_measurements = list(csv.DictReader(open('Measurements.csv')))
print(data_measurements[0])

# 1st: login to database
conn = pgconnect()

# if you want to reset the table
pgexec (conn, "DROP TABLE IF EXISTS Measurement CASCADE", None, "Reset Table Measurement")

# 2nd: ensure that the schema is in place
measurement_schema = """CREATE TABLE IF NOT EXISTS Measurement (
                         station VARCHAR(20), 
                         date DATE, 
                         level VARCHAR(20), 
                         meanDischarge VARCHAR(20), 
                         discharge VARCHAR(20), 
                         temp VARCHAR(20), 
                         ec VARCHAR(20)
                   )"""
pgexec (conn, measurement_schema, None, "Create Table Measurement")

# 3rd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Measurement(station,date,level,meandischarge,discharge,temp,ec)
                      VALUES (%(Station)s, %(Date)s, %(Level)s,%(MeanDischarge)s,%(Discharge)s,%(Temp)s,%(EC)s)"""
for i, row in enumerate(data_measurements):
    pgexec (conn, insert_stmt, row, f"Row {i+1} inserted")
    
# 4th: keep in mind to close connection
conn.close()

{'Station': '409204C', 'Date': '31-Dec-04', 'Level': '1.196', 'MeanDischarge': '5876.087', 'Discharge': '5873.837', 'Temp': '23.217', 'EC': '57.583'}
connected
success: Reset Table Measurement
success: Create Table Measurement
success: Row 1 inserted
success: Row 2 inserted
success: Row 3 inserted
success: Row 4 inserted
success: Row 5 inserted
success: Row 6 inserted
success: Row 7 inserted
success: Row 8 inserted
success: Row 9 inserted
success: Row 10 inserted
success: Row 11 inserted
success: Row 12 inserted
success: Row 13 inserted
success: Row 14 inserted
success: Row 15 inserted
success: Row 16 inserted
success: Row 17 inserted
success: Row 18 inserted
success: Row 19 inserted
success: Row 20 inserted
success: Row 21 inserted
success: Row 22 inserted
success: Row 23 inserted
success: Row 24 inserted
success: Row 25 inserted
success: Row 26 inserted
success: Row 27 inserted
success: Row 28 inserted
success: Row 29 inserted
success: Row 30 inserted
success: Row 31 inserted
success

success: Row 324 inserted
success: Row 325 inserted
success: Row 326 inserted
success: Row 327 inserted
success: Row 328 inserted
success: Row 329 inserted
success: Row 330 inserted
success: Row 331 inserted
success: Row 332 inserted
success: Row 333 inserted
success: Row 334 inserted
success: Row 335 inserted
success: Row 336 inserted
success: Row 337 inserted
success: Row 338 inserted
success: Row 339 inserted
success: Row 340 inserted
success: Row 341 inserted
success: Row 342 inserted
success: Row 343 inserted
success: Row 344 inserted
success: Row 345 inserted
success: Row 346 inserted
success: Row 347 inserted
success: Row 348 inserted
success: Row 349 inserted
success: Row 350 inserted
success: Row 351 inserted
success: Row 352 inserted
success: Row 353 inserted
success: Row 354 inserted
success: Row 355 inserted
success: Row 356 inserted
success: Row 357 inserted
success: Row 358 inserted
success: Row 359 inserted
success: Row 360 inserted
success: Row 361 inserted
success: Row

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 2: Data Cleaning

### Data Cleaning
We use the following clean() function.

In [8]:
import numpy as np
DEFAULT_VALUE = np.nan

def clean(data, column_key, convert_function, default_value):
    special_values= {} # no special values yet
    for row in data:
        old_value = row[column_key]
        new_value = default_value
        try:
            if old_value in special_values.keys():
                new_value = special_values[old_value]
            else:
                new_value = convert_function(old_value)
        except (ValueError, TypeError):
            print(f'Replacing {old_value} with {new_value} in column {column_key}')
            special_values[old_value] = new_value
        row[column_key] = new_value

In [9]:
# this conversion strips any leading or trailing spaces from the 'Station' values
clean(data_measurements, 'Station', str.strip, DEFAULT_VALUE)

# the following converts the measurment columns to float values - or NaN
clean(data_measurements, 'Level', float, DEFAULT_VALUE)
clean(data_measurements, 'MeanDischarge', float, DEFAULT_VALUE)
clean(data_measurements, 'Discharge', float, DEFAULT_VALUE)
clean(data_measurements, 'Temp', float, DEFAULT_VALUE)
clean(data_measurements, 'EC', float, DEFAULT_VALUE)

## now we insert the  data_measurements into the 'Measurement' table 
# 1st: login to database
conn = pgconnect()

# 2nd: ensure that the schema is in place
pgexec (conn, "DROP TABLE IF EXISTS Measurement CASCADE", None, "Reset Table Measurement")
measurement_schema = """CREATE TABLE IF NOT EXISTS Measurement (
                         station   VARCHAR(20),
                         date DATE,
                         level  FLOAT,
                         meanDischarge FLOAT,
                         discharge  FLOAT,
                         temp FLOAT,
                         ec  FLOAT
                      )"""
pgexec (conn, measurement_schema, None, "Create Table Measurement")

# 3rd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Measurement(station,date,level,meandischarge,discharge,temp,ec)
                      VALUES (%(Station)s,%(Date)s,%(Level)s,%(MeanDischarge)s,%(Discharge)s,%(Temp)s,%(EC)s)"""
for i, row in enumerate(data_measurements):
    pgexec (conn, insert_stmt, row, f"Row {i+1} inserted")
    
query_stmt = "SELECT COUNT(*) FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)

# 4th: keep in mind to close connection
conn.close()

Replacing x with nan in column Level
Replacing x with nan in column MeanDischarge
Replacing x with nan in column Discharge
Replacing x with nan in column Temp
Replacing x with nan in column EC
connected
success: Reset Table Measurement
success: Create Table Measurement
success: Row 1 inserted
success: Row 2 inserted
success: Row 3 inserted
success: Row 4 inserted
success: Row 5 inserted
success: Row 6 inserted
success: Row 7 inserted
success: Row 8 inserted
success: Row 9 inserted
success: Row 10 inserted
success: Row 11 inserted
success: Row 12 inserted
success: Row 13 inserted
success: Row 14 inserted
success: Row 15 inserted
success: Row 16 inserted
success: Row 17 inserted
success: Row 18 inserted
success: Row 19 inserted
success: Row 20 inserted
success: Row 21 inserted
success: Row 22 inserted
success: Row 23 inserted
success: Row 24 inserted
success: Row 25 inserted
success: Row 26 inserted
success: Row 27 inserted
success: Row 28 inserted
success: Row 29 inserted
success: Row 3

success: Row 347 inserted
success: Row 348 inserted
success: Row 349 inserted
success: Row 350 inserted
success: Row 351 inserted
success: Row 352 inserted
success: Row 353 inserted
success: Row 354 inserted
success: Row 355 inserted
success: Row 356 inserted
success: Row 357 inserted
success: Row 358 inserted
success: Row 359 inserted
success: Row 360 inserted
success: Row 361 inserted
success: Row 362 inserted
success: Row 363 inserted
success: Row 364 inserted
success: Row 365 inserted
success: Row 366 inserted
success: Row 367 inserted
success: Row 368 inserted
success: Row 369 inserted
success: Row 370 inserted
success: Row 371 inserted
success: Row 372 inserted
success: Row 373 inserted
success: Row 374 inserted
success: Row 375 inserted
success: Row 376 inserted
success: Row 377 inserted
success: Row 378 inserted
success: Row 379 inserted
success: Row 380 inserted
success: Row 381 inserted
success: Row 382 inserted
success: Row 383 inserted
success: Row 384 inserted
success: Row

## Your Task: Data Cleaning

Use above's  clean()  function to clean the other datasets too.
 1. Read the Stations csv file into data_stations
 2. Clean the  'data_stations'  data set
 3. Load the 'data_stations'  dictionary into your database
 4. Query the 'Stations' table - what difference do you see?
 
 5. If you have time: Do all of the above (reading - cleaning - loading) also for the 'Sensors.csv' data set

Note: You might encounter a few warning and error messages.
   - If a connection is closed, you have to open the databse connection again first
   - If the clean() function returns a warning that some string was replaced with NaN, as long as this is indeed a number attribute, you are Ok to ignore this message. It just tells you that it is doing what it is supposed to do.
   - If you try to insert data into an already existing table with data inside, you might get 'duplicate primary key' error messages. Again, you can ignore those for the moment.
   - If you want to see how much data is already in your table, use the following SQL query:
     -  SELECT COUNT(*) FROM Station;

In [10]:
# TODO: replace the content of this cell with your Python solution
# raise NotImplementedError

import csv

data_stations = list(csv.DictReader(open('Stations.csv')))
print(data_stations[0])

# 1st: login to database
conn = pgconnect()

# if you want to reset the table
pgexec (conn, "DROP TABLE IF EXISTS Station CASCADE", None, "Reset Table Station")
# 2nd: ensure that the schema is in place
stations_schema = """CREATE TABLE IF NOT EXISTS Station (
                         station VARCHAR(20) PRIMARY KEY, 
                         siteName VARCHAR(150), 
                         commence DATE, 
                         orgCode VARCHAR(20)
                   )"""


pgexec (conn, stations_schema, None, "Create Table Station")
pgexec (conn, "SET datestyle = 'ISO,DMY'; ", None, "Modify datestyle")

# 3rd: load data
# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Station(station, siteName,commence,orgCode)
                      VALUES (%(Station)s, %(SiteName)s, %(Commence)s, %(OrgCode)s)"""

stationData = dict()
for i, row in enumerate(data_stations):
    stationData['Station'] = row['BasinNo']+row['Site']
    stationData['SiteName']  = row['SiteName']
    stationData['Commence'] = row['Commence']
    stationData['OrgCode'] = row['OrgCode']
    
    pgexec (conn, insert_stmt, stationData, f"Row {i+1} inserted")

query_stmt = "SELECT * FROM Station"
print(query_stmt)
pgquery (conn, query_stmt, None)

# 4th: keep in mind to close connection
conn.close()

{'BasinNo': '409', 'Site': '001', 'SiteName': 'Murray River at Albury (Union Bridge)', 'Long': '146.8957 E', 'Lat': '36.0929 S', 'Commence': '14/04/1892', 'OrgCode': 'DWR'}
connected
success: Reset Table Station
success: Create Table Station
success: Modify datestyle
success: Row 1 inserted
success: Row 2 inserted
success: Row 3 inserted
success: Row 4 inserted
success: Row 5 inserted
success: Row 6 inserted
success: Row 7 inserted
success: Row 8 inserted
success: Row 9 inserted
SELECT * FROM Station
('409001', 'Murray River at Albury (Union Bridge)', datetime.date(1892, 4, 14), 'DWR')
('409002', 'Murray River at Corowa', datetime.date(1894, 4, 1), 'DWR')
('409003', 'Murray River at Denuquin', datetime.date(1896, 9, 1), 'DWR')
('409005', 'Murray River at Barham', datetime.date(1900, 12, 31), 'DWR')
('409204C', 'Murray River @ Swan Hill', datetime.date(1904, 12, 31), 'VRW')
('409017', 'Murray River @ Doctors Point', datetime.date(1925, 8, 22), 'DWR')
('409019', 'Wakool River at Offtake 

# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 3: Database Creation
The next step is to create the SQL schema (see below) in your PostgreSQL database.

### Your Task: DB Creation in PostgreSQL
Create the corresponding tables in PostgreSQL which follow from the data model.

<pre>
DROP TABLE IF EXISTS Organisation CASCADE;
CREATE TABLE IF NOT EXISTS Organisation (
   code VARCHAR(20) PRIMARY KEY,
   organisation    VARCHAR(150)
);

DROP TABLE IF EXISTS Station CASCADE;
CREATE TABLE IF NOT EXISTS Station (
     station   VARCHAR(50) PRIMARY KEY,
     siteName  VARCHAR(50),
     commence DATE,
     orgCode  VARCHAR(50),
     CONSTRAINT orgCodeFK FOREIGN KEY (orgCode)   REFERENCES Organisation (code)
 );

DROP TABLE IF EXISTS Sensor CASCADE;
CREATE TABLE IF NOT EXISTS Sensor (
     sensor   VARCHAR(20) PRIMARY KEY,
     description  VARCHAR(150) ,       
     metric VARCHAR(20)                 
  );

DROP TABLE IF EXISTS Measurement CASCADE;
CREATE TABLE IF NOT EXISTS Measurement (
     station   VARCHAR(20),
     sensor   VARCHAR(20),
     date DATE,
     value  FLOAT,
     CONSTRAINT stationFK FOREIGN KEY (station)   REFERENCES Station (Station),
     CONSTRAINT sensorFK FOREIGN KEY (sensor)   REFERENCES sensor (sensor)
  ); 
</pre>

In [11]:
# TODO: replace the content of this cell with your Python + SQL solution
# raise NotImplementedError

table_schema = """
DROP TABLE IF EXISTS Organisation CASCADE;
CREATE TABLE IF NOT EXISTS Organisation (
   code VARCHAR(20) PRIMARY KEY,
   organisation    VARCHAR(150)
);

DROP TABLE IF EXISTS Station CASCADE;
CREATE TABLE IF NOT EXISTS Station (
     station   VARCHAR(50) PRIMARY KEY,
     siteName  VARCHAR(50),
     commence DATE,
     orgCode  VARCHAR(50),
     CONSTRAINT orgCodeFK FOREIGN KEY (orgCode)   REFERENCES Organisation (code)
 );

DROP TABLE IF EXISTS Sensor CASCADE;
CREATE TABLE IF NOT EXISTS Sensor (
     sensor   VARCHAR(20) PRIMARY KEY,
     description  VARCHAR(150) ,       
     metric VARCHAR(20)                 
  );

DROP TABLE IF EXISTS Measurement CASCADE;
CREATE TABLE IF NOT EXISTS Measurement (
     station   VARCHAR(20),
     sensor   VARCHAR(20),
     date DATE,
     value  FLOAT,
     CONSTRAINT stationFK FOREIGN KEY (station)   REFERENCES Station (Station),
     CONSTRAINT sensorFK FOREIGN KEY (sensor)   REFERENCES sensor (sensor)
  );
"""
conn = pgconnect()
pgexec(conn, table_schema, None, "Created our data warehouse")
conn.close()

connected
success: Created our data warehouse


# STOP PLEASE. THE FOLLOWING IS FOR THE NEXT EXERCISE. THANKS.

## EXERCISE 4: Data Loading and Storage

Up-to this point, we have
 - analysed and modelled the given dataset
 - created a corresponding relational star schema
 - read the individual CSV files into Python dictionary data structures
 - cleaned the raw data with regard to missing or inconsistent entries and data types
 
The final step is to load this cleaned data into the corresponding tables of the star schema which we defined so far.

For this to work, you probably will need to write some logic to load different parts of different data dictionaries (holding the content of CSV files) into different tables.


In [12]:
import numpy as np
import csv

# make sure we are still connected to database 
conn = pgconnect()

#####################
# Organisation table
data_organisations = list(csv.DictReader(open('Organisations.csv')))

# check for any existing content of the Organisations table
query_stmt = "SELECT COUNT(*) FROM Organisation"
print(query_stmt)
pgquery (conn, query_stmt, None)

# Try to load data - 9 tuples should be created
insert_stmt = """INSERT INTO Organisation(code,organisation) VALUES (%(Code)s, %(Organisation)s)"""
for i, row in enumerate(data_organisations):
    pgexec (conn, insert_stmt, row, f"Organisation {i+1} inserted")
    
#####################
# Station table    
data_stations = list(csv.DictReader(open('Stations.csv')))

# check for any existing content of the Station table
query_stmt = "SELECT COUNT(*) FROM Station"
print(query_stmt)
pgquery (conn, query_stmt, None)

pgexec(conn, "SET datestyle = 'ISO,DMY'; ", None, "Modify datestyle")

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Station(station,siteName,commence,orgCode)
                      VALUES (%(station)s, %(siteName)s, %(commence)s, %(orgCode)s)"""
stationData = dict()
for i, row in enumerate(data_stations):
    stationData['station'] = row['BasinNo']+row['Site']
    stationData['siteName']  = row['SiteName']
    stationData['commence'] = row['Commence']
    stationData['orgCode'] = row['OrgCode']
    pgexec (conn, insert_stmt, stationData, f"Station {i+1} inserted")

#####################
# Sensor table    
data_sensors = list(csv.DictReader(open('Sensors.csv')))

# check for any existing content of the Sensor table
query_stmt = "SELECT COUNT(*) FROM Sensor"
print(query_stmt)
pgquery (conn, query_stmt, None)

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Sensor(sensor,description,metric)
                      VALUES (%(sensor)s, %(description)s, %(metric)s)"""

for i, row in enumerate(data_sensors):
    pgexec (conn, insert_stmt, row, f"Sensor {i+1} inserted")

#####################
# Measurement table    
data_measurements = list(csv.DictReader(open('Measurements.csv')))

# this conversion strips any leading or trailing spaces from the 'Station' values
clean(data_measurements, 'Station', str.strip, DEFAULT_VALUE)

# the following converts the measurment columns to float values - or NaN
clean(data_measurements, 'Discharge', float, DEFAULT_VALUE)
clean(data_measurements, 'MeanDischarge', float, DEFAULT_VALUE)
clean(data_measurements, 'Level', float, DEFAULT_VALUE)
clean(data_measurements, 'Temp', float, DEFAULT_VALUE)
clean(data_measurements, 'EC', float, DEFAULT_VALUE)

# check for any existing content of the Measurement table
query_stmt = "SELECT COUNT(*) FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)

# IMPORTANT: make sure the header line of CSV is without spaces!
insert_stmt = """INSERT INTO Measurement(station,date,sensor,value)
                      VALUES (%(station)s, %(date)s, %(sensor)s, %(value)s)"""

measurementData = dict()
sensorCodes = ['levl', 'disvol','disc','temp', 'ec']
sensor_columns = ['Level', 'MeanDischarge', 'Discharge','Temp', 'EC']
cnt = 1
for row in data_measurements:
    measurementData['station'] = row['Station']
    measurementData['date'] = row['Date']
    for i in range(len(sensorCodes)):
        if(np.isnan(row[sensor_columns[i]])):
            continue;
        measurementData['sensor'] = sensorCodes[i]
        measurementData['value']  = row[sensor_columns[i]]
        pgexec (conn, insert_stmt, measurementData, f"Measurement {cnt} inserted")
        cnt += 1

conn.close();

connected
SELECT COUNT(*) FROM Organisation
(0,)
success: Organisation 1 inserted
success: Organisation 2 inserted
success: Organisation 3 inserted
success: Organisation 4 inserted
success: Organisation 5 inserted
success: Organisation 6 inserted
success: Organisation 7 inserted
success: Organisation 8 inserted
success: Organisation 9 inserted
SELECT COUNT(*) FROM Station
(0,)
success: Modify datestyle
success: Station 1 inserted
success: Station 2 inserted
success: Station 3 inserted
success: Station 4 inserted
success: Station 5 inserted
success: Station 6 inserted
success: Station 7 inserted
success: Station 8 inserted
success: Station 9 inserted
SELECT COUNT(*) FROM Sensor
(0,)
success: Sensor 1 inserted
success: Sensor 2 inserted
success: Sensor 3 inserted
success: Sensor 4 inserted
success: Sensor 5 inserted
success: Sensor 6 inserted
success: Sensor 7 inserted
Replacing x with nan in column Discharge
Replacing x with nan in column MeanDischarge
Replacing x with nan in column Lev

success: Measurement 272 inserted
success: Measurement 273 inserted
success: Measurement 274 inserted
success: Measurement 275 inserted
success: Measurement 276 inserted
success: Measurement 277 inserted
success: Measurement 278 inserted
success: Measurement 279 inserted
success: Measurement 280 inserted
success: Measurement 281 inserted
success: Measurement 282 inserted
success: Measurement 283 inserted
success: Measurement 284 inserted
success: Measurement 285 inserted
success: Measurement 286 inserted
success: Measurement 287 inserted
success: Measurement 288 inserted
success: Measurement 289 inserted
success: Measurement 290 inserted
success: Measurement 291 inserted
success: Measurement 292 inserted
success: Measurement 293 inserted
success: Measurement 294 inserted
success: Measurement 295 inserted
success: Measurement 296 inserted
success: Measurement 297 inserted
success: Measurement 298 inserted
success: Measurement 299 inserted
success: Measurement 300 inserted
success: Measu

success: Measurement 533 inserted
success: Measurement 534 inserted
success: Measurement 535 inserted
success: Measurement 536 inserted
success: Measurement 537 inserted
success: Measurement 538 inserted
success: Measurement 539 inserted
success: Measurement 540 inserted
success: Measurement 541 inserted
success: Measurement 542 inserted
success: Measurement 543 inserted
success: Measurement 544 inserted
success: Measurement 545 inserted
success: Measurement 546 inserted
success: Measurement 547 inserted
success: Measurement 548 inserted
success: Measurement 549 inserted
success: Measurement 550 inserted
success: Measurement 551 inserted
success: Measurement 552 inserted
success: Measurement 553 inserted
success: Measurement 554 inserted
success: Measurement 555 inserted
success: Measurement 556 inserted
success: Measurement 557 inserted
success: Measurement 558 inserted
success: Measurement 559 inserted
success: Measurement 560 inserted
success: Measurement 561 inserted
success: Measu

success: Measurement 778 inserted
success: Measurement 779 inserted
success: Measurement 780 inserted
success: Measurement 781 inserted
success: Measurement 782 inserted
success: Measurement 783 inserted
success: Measurement 784 inserted
success: Measurement 785 inserted
success: Measurement 786 inserted
success: Measurement 787 inserted
success: Measurement 788 inserted
success: Measurement 789 inserted
success: Measurement 790 inserted
success: Measurement 791 inserted
success: Measurement 792 inserted
success: Measurement 793 inserted
success: Measurement 794 inserted
success: Measurement 795 inserted
success: Measurement 796 inserted
success: Measurement 797 inserted
success: Measurement 798 inserted
success: Measurement 799 inserted
success: Measurement 800 inserted
success: Measurement 801 inserted
success: Measurement 802 inserted
success: Measurement 803 inserted
success: Measurement 804 inserted
success: Measurement 805 inserted
success: Measurement 806 inserted
success: Measu

success: Measurement 1019 inserted
success: Measurement 1020 inserted
success: Measurement 1021 inserted
success: Measurement 1022 inserted
success: Measurement 1023 inserted
success: Measurement 1024 inserted
success: Measurement 1025 inserted
success: Measurement 1026 inserted
success: Measurement 1027 inserted
success: Measurement 1028 inserted
success: Measurement 1029 inserted
success: Measurement 1030 inserted
success: Measurement 1031 inserted
success: Measurement 1032 inserted
success: Measurement 1033 inserted
success: Measurement 1034 inserted
success: Measurement 1035 inserted
success: Measurement 1036 inserted
success: Measurement 1037 inserted
success: Measurement 1038 inserted
success: Measurement 1039 inserted
success: Measurement 1040 inserted
success: Measurement 1041 inserted
success: Measurement 1042 inserted
success: Measurement 1043 inserted
success: Measurement 1044 inserted
success: Measurement 1045 inserted
success: Measurement 1046 inserted
success: Measurement

success: Measurement 1325 inserted
success: Measurement 1326 inserted
success: Measurement 1327 inserted
success: Measurement 1328 inserted
success: Measurement 1329 inserted
success: Measurement 1330 inserted
success: Measurement 1331 inserted
success: Measurement 1332 inserted
success: Measurement 1333 inserted
success: Measurement 1334 inserted
success: Measurement 1335 inserted
success: Measurement 1336 inserted
success: Measurement 1337 inserted
success: Measurement 1338 inserted
success: Measurement 1339 inserted
success: Measurement 1340 inserted
success: Measurement 1341 inserted
success: Measurement 1342 inserted
success: Measurement 1343 inserted
success: Measurement 1344 inserted
success: Measurement 1345 inserted
success: Measurement 1346 inserted
success: Measurement 1347 inserted
success: Measurement 1348 inserted
success: Measurement 1349 inserted
success: Measurement 1350 inserted
success: Measurement 1351 inserted
success: Measurement 1352 inserted
success: Measurement

success: Measurement 1581 inserted
success: Measurement 1582 inserted
success: Measurement 1583 inserted
success: Measurement 1584 inserted
success: Measurement 1585 inserted
success: Measurement 1586 inserted
success: Measurement 1587 inserted
success: Measurement 1588 inserted
success: Measurement 1589 inserted
success: Measurement 1590 inserted
success: Measurement 1591 inserted
success: Measurement 1592 inserted
success: Measurement 1593 inserted
success: Measurement 1594 inserted
success: Measurement 1595 inserted
success: Measurement 1596 inserted
success: Measurement 1597 inserted
success: Measurement 1598 inserted
success: Measurement 1599 inserted
success: Measurement 1600 inserted
success: Measurement 1601 inserted
success: Measurement 1602 inserted
success: Measurement 1603 inserted
success: Measurement 1604 inserted
success: Measurement 1605 inserted
success: Measurement 1606 inserted
success: Measurement 1607 inserted
success: Measurement 1608 inserted
success: Measurement

## Your Task: Data Storage in PostgreSQL
Following the above pattern, make sure all the   tables of our water schema are loaded with the data from the different CSV files.

In [13]:
# TODO: replace the content of this cell with your Python + SQL solution
# raise NotImplementedError

conn = pgconnect()
    
# check content of Organisations table
query_stmt = "SELECT COUNT(*) FROM Organisation"
print(query_stmt)
pgquery (conn, query_stmt, None)

# check content of Station table
query_stmt = "SELECT count(*) FROM Station"
print(query_stmt)
pgquery (conn, query_stmt, None)

# check content of Sensor table
query_stmt = "SELECT count(*) FROM Sensor"
print(query_stmt)
pgquery (conn, query_stmt, None)

# check content of Measurement table
query_stmt = "SELECT count(*) FROM Measurement"
print(query_stmt)
pgquery (conn, query_stmt, None)

conn.close()

connected
SELECT COUNT(*) FROM Organisation
(9,)
SELECT count(*) FROM Station
(9,)
SELECT count(*) FROM Sensor
(7,)
SELECT count(*) FROM Measurement
(1793,)


# End of Tutorial. Many Thanks.