# Storing Storm Data

## Introduction

Recently, the International Hurricane Watchgroup (IHW) has been asked to update their analysis tools. Because of the increase in public awareness of hurricanes, they are required to be more diligient with the analysis of historical hurricane data they share across the organization. They have asked you, someone with experience in databases, to help work with their team to productionize their services.

Accepting the job, their team tells you that they have been having trouble sharing data across the teams and keeping it consistent. From what they've told you, it seems that their method of sharing the data with their data anaylsts has been to save a CSV file on their local servers and have every data analyst pull the data down. Then, each analyst uses a local SQLite engine to store the CSV, run their queries, and send their results around.

## CSV Columns

From what they have told you, you might be thinking that this is an inefficient way of sharing data. To understand what you will be working on, they have sent you a CSV file. Their CSV file contains the following fields:
- **fid** - ID for the row
- **year** - Recorded year
- **month** - Recorded month
- **day** - Recorded date
- **ad_time** - Recorded time in UTC
- **btid** - Hurricane ID
- **name** - Name of the hurricane
- **lat** - Latitude of the recorded location
- **long** - Longitude of the recorded location
- **wind_kts** - Wind speed in knots per second
- **pressure** - Atmospheric pressure of the hurricane
- **cat** - Hurricane category
- **basin** - The basin the hurricane is located
- **shape_leng** - Hurricane shape length

## Importing packages and reading dataset

In [1]:
import psycopg2
import csv
import pandas as pd
import numpy as np
from urllib import request
import io

In [2]:
#We have downloaded the CSV and we can read it from local disk

with open("my_datasets/storm_data.csv","r", encoding = 'utf-8-sig') as f:
    reader = csv.reader(f)
    data = list(reader)

In [3]:
#Or we could use this to read a csv online

# response = request.urlopen('https://dq-content.s3.amazonaws.com/251/storm_data.csv')
# reader = csv.reader(io.TextIOWrapper(response))
# data = list(reader)

In [4]:
# Checking data as DataFrame
df = pd.DataFrame(data[1:], columns = data[0])
df.head()

Unnamed: 0,FID,YEAR,MONTH,DAY,AD_TIME,BTID,NAME,LAT,LONG,WIND_KTS,PRESSURE,CAT,BASIN,Shape_Leng
0,2001,1957,8,8,1800Z,63,NOTNAMED,22.5,-140.0,50,0,TS,Eastern Pacific,1.140175
1,2002,1961,10,3,1200Z,116,PAULINE,22.1,-140.2,45,0,TS,Eastern Pacific,1.16619
2,2003,1962,8,29,0600Z,124,C,18.0,-140.0,45,0,TS,Eastern Pacific,2.10238
3,2004,1967,7,14,0600Z,168,DENISE,16.6,-139.5,45,0,TS,Eastern Pacific,2.12132
4,2005,1972,8,16,1200Z,251,DIANA,18.5,-139.8,70,0,H1,Eastern Pacific,1.702939


## CSV Datatypes

Let's choose the proper datatypes for the different columns before creating the Postgres table:

- **fid** - INTEGER (4 bytes)
- **year** - SMALLINT (2 bytes)
- **month** - SMALLINT (2 bytes)
- **day** - SMALLINT (2 bytes)
- **ad_time** - VARCHAR(5): Text field with up to 5 characters.
- **btid** - INTEGER (4 bytes)
- **name** - TEXT
- **lat** - DECIMAL(4, 1): Precision 4, Scale 1. That means total number of digits up to 4, with up to 1 decimals.
- **long** - DECIMAL(4, 1): Precision 4, Scale 1. That means total number of digits up to 4, with up to 1 decimals.
- **wind_kts** - SMALLINT (2 bytes) 
- **pressure** - SMALLINT (2 bytes)
- **cat** - VARCHAR(2): Text field with up to 2 characters.
- **basin** - VARCHAR(20)
- **shape_leng** - DECIMAL(7, 6): Precision 7, Scale 6. That means total number of digits up to 7, with up to 6 decimals.

However, we could merge **year**, **month**, **day** and **ad_time** columns into a DATE type column.

## Final PostgreSQL columns

- **fid** - INTEGER (4 bytes)
- **recorded_datetime** - TIMESTAMP: No timezone needed as all are UTC
- **btid** - INTEGER (4 bytes)
- **name** - TEXT
- **lat** - DECIMAL(4, 1): Precision 4, Scale 1. That means total number of digits up to 4, with up to 1 decimals.
- **long** - DECIMAL(4, 1): Precision 4, Scale 1. That means total number of digits up to 4, with up to 1 decimals.
- **wind_kts** - SMALLINT (2 bytes) 
- **pressure** - SMALLINT (2 bytes)
- **cat** - VARCHAR(2): Text field with up to 2 characters.
- **basin** - VARCHAR(20)
- **shape_leng** - DECIMAL(7, 6): Precision 7, Scale 6. That means total number of digits up to 7, with up to 6 decimals.

## PostgreSQL new database creation

In [5]:
conn = psycopg2.connect(dbname="postgres", user="postgres")
#Autocommit needed for database creation: CREATE DATABASE cannot be called in transaction
conn.autocommit = True
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS project32")
try:
    cur.execute("CREATE DATABASE project32 OWNER postgres")
    conn.autocommit = False
    conn.close()
    print("Database created.")
except:
    conn.autocommit = False
    conn.close()
    print("Problem during the database creation.\nIt might already exist.")

Database created.


## Creating new table

In [6]:
conn = psycopg2.connect(dbname="project32", user="postgres")
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS hurricane_table")
cur.execute(
"""
CREATE TABLE hurricane_table (
    fid INTEGER PRIMARY KEY,
    recorded_datetime TIMESTAMP,
    btid INTEGER,
    name TEXT,
    lat DECIMAL(4, 1),
    long DECIMAL(4, 1),
    wind_kts SMALLINT,
    pressure SMALLINT,
    cat VARCHAR(2),
    basin VARCHAR(20),
    shape_leng DECIMAL(8, 6)
)
"""
)
conn.commit()
cur.execute(
"""
SELECT column_name, ordinal_position, data_type
FROM information_schema.columns 
WHERE table_name='hurricane_table'
"""
)
# Colnames to get the fields names
colnames = [desc[0] for desc in cur.description]
table_info = pd.DataFrame(cur.fetchall(), columns=colnames)
conn.close()

In [7]:
# Show the information of the new table as DataFrame
table_info

Unnamed: 0,column_name,ordinal_position,data_type
0,fid,1,integer
1,recorded_datetime,2,timestamp without time zone
2,btid,3,integer
3,name,4,text
4,lat,5,numeric
5,long,6,numeric
6,wind_kts,7,smallint
7,pressure,8,smallint
8,cat,9,character varying
9,basin,10,character varying


## Creating new users for the database

In [8]:
conn = psycopg2.connect(dbname="project32", user="postgres")
cur = conn.cursor()
#Revoke needed before drop. You are not allowed to drop a user if it has privileges
cur.execute(
"""
REVOKE ALL ON hurricane_table FROM data_production;
REVOKE ALL ON hurricane_table FROM data_analyst;
DROP USER IF EXISTS data_production;
DROP USER IF EXISTS data_analyst
"""
)
cur.execute(
"""
CREATE USER data_production;
CREATE USER data_analyst;
REVOKE ALL ON hurricane_table FROM data_production;
REVOKE ALL ON hurricane_table FROM data_analyst;
GRANT SELECT, INSERT, UPDATE ON hurricane_table TO data_production;
GRANT SELECT ON hurricane_table TO data_analyst;
"""
)
conn.commit()
cur.execute(
"""
SELECT grantee, table_catalog, table_schema, table_name, privilege_type
FROM   information_schema.table_privileges 
WHERE  grantee = 'data_production' OR grantee = 'data_analyst'
"""
)
colnames = [desc[0] for desc in cur.description]
users_info = pd.DataFrame(cur.fetchall(), columns=colnames)
conn.close()

In [9]:
users_info

Unnamed: 0,grantee,table_catalog,table_schema,table_name,privilege_type
0,data_production,project32,public,hurricane_table,INSERT
1,data_production,project32,public,hurricane_table,SELECT
2,data_production,project32,public,hurricane_table,UPDATE
3,data_analyst,project32,public,hurricane_table,SELECT


## Porting data from CSV to PostgreSQL Table

In [10]:
from datetime import datetime

conn = psycopg2.connect(dbname="project32", user="postgres")
cur = conn.cursor()

with open("my_datasets/storm_data.csv","r", encoding = 'utf-8-sig') as f:
    next(f)
    reader = csv.reader(f)
    q = "INSERT INTO hurricane_table VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    for row in reader:
        new_row = [row[0]]
        datetime_part = datetime(year = int(row[1]), month = int(row[2]), day = int(row[3]), 
                                 hour = int(row[4][0:2]), minute = int(row[4][2:4]))
        new_row.append(datetime_part)
        new_row.extend(row[5:14])
        cur.execute(q,new_row)
conn.commit()
conn.close()

In [11]:
conn = psycopg2.connect(dbname="project32", user="postgres")
cur = conn.cursor()
cur.execute(
"""
SELECT *
FROM hurricane_table
WHERE fid < 20
"""
)
colnames = [desc[0] for desc in cur.description]
check = pd.DataFrame(cur.fetchall(), columns=colnames)
conn.close()

In [12]:
check

Unnamed: 0,fid,recorded_datetime,btid,name,lat,long,wind_kts,pressure,cat,basin,shape_leng
0,1,1982-08-01 12:00:00,396,GILMA,18.2,-156.4,30,0,TD,Eastern Pacific,2.009975
1,2,1982-11-24 06:00:00,412,IWA,23.3,-158.4,80,0,H1,Eastern Pacific,2.683282
2,3,1950-08-16 06:00:00,10,HIKI,23.7,-157.5,75,0,H1,Eastern Pacific,1.081665
3,4,1950-08-17 06:00:00,10,HIKI,24.5,-161.6,75,0,H1,Eastern Pacific,0.943398
4,5,1963-08-06 18:00:00,132,NOTNAMED,19.7,-172.8,45,0,TS,Eastern Pacific,3.041381
5,6,1950-08-17 12:00:00,10,HIKI,24.0,-162.4,75,0,H1,Eastern Pacific,0.806226
6,7,1950-08-17 18:00:00,10,HIKI,23.6,-163.1,75,0,H1,Eastern Pacific,0.761577
7,8,1963-08-07 00:00:00,132,NOTNAMED,22.7,-173.3,45,0,TS,Eastern Pacific,1.104536
8,9,1950-08-18 00:00:00,10,HIKI,23.3,-163.8,75,0,H1,Eastern Pacific,1.029563
9,10,1963-09-18 12:00:00,134,IRAH,21.3,-157.6,25,0,TD,Eastern Pacific,1.749286


## Delete table

Let's drop the table to free space after finishing the project:

In [13]:
conn = psycopg2.connect(dbname="project32", user="postgres")
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS hurricane_table")
conn.commit()
conn.close()