This code module picks up where the 00 module left off.  Data has been downloaded and the raw_data.csv file has been renamed to raw_data_lhbvlh_2014_2015.csv.

We can check by looking at the directory Output/raw_data subdirectory. If you ran the code in 00 module, the file will be named raw_data.csv. the file posted to gitub and included in the example utility in the container is named raw_data_lhbvlh_2014_2015.csv.  First we'll load required packages and then check the raw_data directory to make sure the file we want is there.


In [1]:
import os
import pandas as pd
import pandas.io.sql as pdsqlio
import psycopg2
import getpass
import re

try:
    os.chdir('M2_JNB/data')
except:
    print('no such directory')
print(os.getcwd())


! ls -lta # bash shell directory command


/home/bk/baseline/M2_JNB/data
total 12900
drwxrwxr-x 4 bk bk     4096 Oct 26 08:59 ..
drwxrwxr-x 2 bk bk     4096 Oct 25 19:40 .
-rw-rw-r-- 1 bk bk 13199167 Oct 24 16:10 raw_data_lhbvlh_2014_2015.csv


We have data to import, so next we will connect to the database and do some setup work. We begin by building a connection to the postgres database.


In [2]:
db=input('enter dbname: ')
user=input('enter user: ')
print('enter password: ')
password=getpass.getpass()
conn_str="host='localhost' dbname='"+db+"' user='"+user+"' password='"+password+"' options='-c search_path=sandbox,public'"
# the options allows us to add a schema called sandbox and be able to work in it
conn=psycopg2.connect(conn_str)
cur=conn.cursor()
cur.execute('select version()')
print(cur.fetchone()[0])

enter dbname:  bk
enter user:  bk


enter password: 


 ········


PostgreSQL 12.4 (Ubuntu 12.4-0ubuntu0.20.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-10ubuntu2) 9.3.0, 64-bit


We are connected to the database.  For importing raw data that will be manipulated into a final format, lets put it into a sandbox schema to keep it separate from our 'good' data.


In [3]:
str_sql = "SELECT exists(select schema_name FROM information_schema.schemata WHERE schema_name = 'sandbox');"
cur.execute(str_sql)
conn.commit()
schema_exists =cur.fetchone()[0]
print(schema_exists)
if schema_exists==False:
    str_sql="create schema sandbox;"
    cur.execute(str_sql)
    conn.commit()
str_sql='select schema_name FROM information_schema.schemata;'
cur.execute(str_sql)
conn.commit()
print(cur.fetchall())


True
[('pg_toast',), ('pg_temp_1',), ('pg_toast_temp_1',), ('pg_catalog',), ('public',), ('information_schema',), ('sandbox',)]


Now that we have created or have confirmed the existance of the sandbox schema, we should be able to use it.


In [4]:
str_sql = "SELECT exists(select * FROM information_schema.tables WHERE table_schema = 'sandbox' and table_name = 'test');"
cur.execute(str_sql)
conn.commit()
table_exists =cur.fetchone()[0]
print(table_exists)
str_sql_drop = "drop table sandbox.test;"
if table_exists==True:
    cur.execute(str_sql_drop)

str_sql = "create table sandbox.test(name varchar(100), primary key(name));"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('George');"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('Jane');"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('Judy');"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('Elroy');"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('Astro');"
cur.execute(str_sql)
str_sql = "insert into sandbox.test(name) values('Rosie');"
cur.execute(str_sql)
conn.commit()

str_sql = "select * from sandbox.test;"
cur.execute(str_sql)
conn.commit()
print(cur.fetchall())


False
[('George',), ('Jane',), ('Judy',), ('Elroy',), ('Astro',), ('Rosie',)]


Now that we've tested our connection string and can add tables to the sandbox schema, we can detlete the test table then pull in the MERRA2 data.


In [5]:
cur.execute(str_sql_drop)
conn.commit()

There are cases where the file might be larger than the computer memory running this notebook. We will get just the first few rows to determine the file format.

In [6]:
fname='raw_data_lhbvlh_2014_2015.csv'
if os.path.isfile(fname):
    data = pd.read_csv(fname, nrows=50)
else:
    raise SystemExit("Stop right there!")
    
x=data.dtypes
print(x)

lat       float64
lon       float64
time       object
H1000     float64
H850      float64
PBLTOP    float64
PS        float64
SLP       float64
T10M      float64
T2M       float64
T2MDEW    float64
T850      float64
TS        float64
U10M      float64
U2M       float64
U50M      float64
V10M      float64
V2M       float64
V50M      float64
dtype: object


As noted in the 00 file, the MERRA2 data is fully described in https://gmao.gsfc.nasa.gov/pubs/docs/Bosilovich785.pdf. 

We can use the dtypes to create a table in our postgres database. 


In [7]:
print(fname)

raw_data_lhbvlh_2014_2015.csv


In [8]:
# note this is strictly a drop and replace, it will not append data as written
def maketable(fname):
    type_map={'float64':'real',
          'int64':'bigint',
          'object':'timestamp'}
    newtablename=re.sub('.csv','',fname,flags=re.I)
    cur.execute("SELECT EXISTS(SELECT * FROM information_schema.tables "+
                "WHERE table_schema = 'sandbox' AND table_name = '"+newtablename+"');")
    if cur.fetchone()[0]:
        cur.execute("DROP TABLE "+newtablename+';')
        conn.commit()
        msg='deleted and replaced'
    else:
        msg='created new'
    data = pd.read_csv(fname,nrows=50)
    types=data.dtypes
    str_sql='CREATE TABLE '+newtablename+' ('
    len_x=len(data.columns.tolist())
    for x in data.columns.tolist():
        str_sql=str_sql+x+' '+  type_map[str(types[x])]
        if(x!=data.columns.tolist()[len_x-1]):
            str_sql=str_sql + ', '
        else :
            str_sql=str_sql+ ');'
    cur.execute(str_sql)
    conn.commit()
    return [msg,str_sql]  

maketable(fname)


['deleted and replaced',
 'CREATE TABLE raw_data_lhbvlh_2014_2015 (lat real, lon real, time timestamp, H1000 real, H850 real, PBLTOP real, PS real, SLP real, T10M real, T2M real, T2MDEW real, T850 real, TS real, U10M real, U2M real, U50M real, V10M real, V2M real, V50M real);']

The table has been created based on the structure of the data, so no we can import the csv directly into the database.


In [9]:
tablename=re.sub('.csv','',fname,flags=re.I)
f=open(fname)
cur.copy_expert("copy sandbox."+tablename+" from STDOUT delimiter ',' csv header",f)
conn.commit()
cur.close()

Now that the data is in the database, we can pull what we want into pandas, or continue working with data in teh database and doing data wrangling and munging directly with sql.

Here's an example of pullin data into pandas using its ability to read sql.


In [10]:
str_sql = "select * from sandbox."+tablename+" limit 5;"
data=pdsqlio.read_sql_query(str_sql,conn)
data

Unnamed: 0,lat,lon,time,h1000,h850,pbltop,ps,slp,t10m,t2m,t2mdew,t850,ts,u10m,u2m,u50m,v10m,v2m,v50m
0,48.0,5.0,2015-06-30 00:30:00,174.72054,1569.6932,97582.89,98326.34,102010.19,289.60745,288.23163,285.75446,287.7547,287.01468,-2.73212,-1.240577,-5.883302,-1.03919,-0.465315,-2.214216
1,48.0,5.0,2015-06-30 01:30:00,172.26204,1566.8104,97558.9,98299.18,101982.32,289.08398,287.6558,285.2584,287.7593,286.4475,-2.85362,-1.276578,-6.057842,-0.621774,-0.271498,-1.292917
2,48.0,5.0,2015-06-30 02:30:00,170.42476,1564.765,97539.94,98278.734,101960.875,288.63873,287.1728,284.87598,287.673,285.96463,-2.90217,-1.28011,-6.082373,-0.274003,-0.115914,-0.552742
3,48.0,5.0,2015-06-30 03:30:00,169.84496,1563.7583,97535.26,98273.61,101954.734,288.12305,286.68875,284.58414,287.60474,285.54578,-2.865928,-1.25251,-5.957701,-0.065182,-0.024377,-0.117637
4,48.0,5.0,2015-06-30 04:30:00,170.25699,1563.744,97541.91,98276.055,101958.875,288.37375,287.64316,285.25012,287.57114,287.26398,-2.613574,-1.346419,-5.817614,0.062035,0.035123,0.142819
