In [1]:
#import modules
%load_ext sql
import timeit
import configparser
import matplotlib.pyplot as plt
import pandas as pd
import psycopg2
import boto3
import json
import datetime

#### we set up our redshift (postgress) datawarehouse using some specifications from our config files in aws cloud

In [2]:
#assign credentials from ini/config file into variables
config = configparser.ConfigParser()
config.read_file(open('dwh.ini'))
KEY=config.get('AWS','key')
SECRET= config.get('AWS','secret')

DWH_DB= config.get("DWH","DWH_DB")
DWH_DB_USER= config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD= config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH","DWH_PORT")

In [3]:
#creates a table/dataframe of resources metadata
config = configparser.ConfigParser()
config.read_file(open('dwh.ini'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


In [4]:
#instantiate various aws resources and clients to be used later
ec2 = boto3.resource('ec2',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

s3 = boto3.resource('s3',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

iam = boto3.client('iam',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

redshift = boto3.client('redshift',
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET)

In [5]:
#create a role that will allow redshift(postgress) to read/access s3 bucket where our text files is.
print('1.2 Attaching Policy')

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                      PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")

1.2 Attaching Policy
1.3 Get the IAM role ARN


In [6]:
#Get and print the IAM role ARN created earlier
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
print(roleArn)

1.3 Get the IAM role ARN
arn:aws:iam::650301586797:role/dwhRole


In [7]:
#create redshift (datawarehouse) cluster, catch error if cluster fail to create.
try:
    response = redshift.create_cluster(  
        #specifying datawarehouse specifications like number of nodes, clustertype, etc
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #adding parameters for identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        #add parameter for role (to allow s3 access)
        IamRoles=[roleArn]
    )
except Exception as e:
    print(e)

In [8]:
#run this to see cluster status
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,creating
3,MasterUsername,dwhuser
4,DBName,dwh
5,VpcId,vpc-023f22a362563b12a
6,NumberOfNodes,4


In [9]:
#only run when the cluster is available. (i need the endpoint for later use)
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::650301586797:role/dwhRole


In [10]:
#configure security related details on vpc (virtual private cloud), like opening up port for all incoming traffic
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name, 
        CidrIp='0.0.0.0/0',  
        IpProtocol='TCP', 
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-08e1b2c253bf59fa2')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


### we test our datawarehouse redshift(postgress) cluster connection

In [11]:
def connect_to_db():
    """
    - connects to the database
    - Returns the connection and cursor 
    """  
    try:
    # connect to database created earlier
        conn_ = psycopg2.connect(host=DWH_ENDPOINT, dbname=DWH_DB, user=DWH_DB_USER, password=DWH_DB_PASSWORD, port=DWH_PORT)
        cur_ = conn_.cursor()
        conn_.commit()
    except Exception as e:
        print("cant connect",'\n',e)
    
    return conn_, cur_

In [12]:
#we create a connection string
conn_string = "postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER,DWH_DB_PASSWORD,DWH_ENDPOINT,DWH_PORT,DWH_DB)

In [13]:
#we print out our connection string
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com:5439/dwh


In [14]:
# when the conn variable executes, a closed argument = 0 indicates that, we are good to go, our connection is open
conn, cur = connect_to_db()
conn

<connection object at 0x000001D05242D948; dsn: 'user=dwhuser password=xxx dbname=dwh host=dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com port=5439', closed: 0>

#### we start to consume the files in our data lake, create models, and begin ingestion

In [15]:
#lets see what we have in our s3 corteve bucket
s3_object = s3.Bucket('corteve')
for obj in s3_object.objects.filter(Prefix="yld_data"):
    print(obj)

s3.ObjectSummary(bucket_name='corteve', key='yld_data/US_corn_grain_yield.txt')


In [16]:
#create staging table in redshift so that we can bulk load our files into 
#create dev table as well

#create  2 schemas, dev and stage, store in variable, to be called later
create_stage_schema = "CREATE SCHEMA IF NOT EXISTS stage"
create_dev_schema = "CREATE SCHEMA IF NOT EXISTS dev"


#create table of interests  in dev and stage respectively, store in variable, to be called later
create_weather_table_stage = """CREATE TABLE IF NOT EXISTS stage.weather (date int NOT NULL PRIMARY KEY SORTKEY,
                   max_temp smallint,
                   min_temp smallint,
                   precipitation smallint,
                   weather_station varchar(10)
                   )"""

create_weather_table_dev = """CREATE TABLE IF NOT EXISTS dev.weather (id int PRIMARY KEY IDENTITY(1,1),
                   date int NOT NULL SORTKEY,
                   max_temp smallint,
                   min_temp smallint,
                   precipitation smallint,
                   weather_station varchar(10)
                   )"""

create_yield_table_stage = """CREATE TABLE IF NOT EXISTS stage.yield (year smallint NOT NULL PRIMARY KEY  SORTKEY,
                   total_harvest int  
                   )"""

create_yield_table_dev = """CREATE TABLE IF NOT EXISTS dev.yield (year smallint NOT NULL PRIMARY KEY  SORTKEY,
                   total_harvest int  
                   )"""

In [17]:
#we execute some of our variables. catch error if any
try:
    cur.execute(create_stage_schema)
    cur.execute(create_dev_schema)
    cur.execute(create_weather_table_stage)
    cur.execute(create_weather_table_dev)
    cur.execute(create_yield_table_stage)
    cur.execute(create_yield_table_dev)
    conn.commit()
    print("tables successfully created")
except Exception as e:
    print('an error occured while creating tables', '\n', e)
    conn.rollback() #we undo any transaction that must have been done earlier to prevent InFailedSqlTransaction error

tables successfully created


In [18]:
#we copy all wx_data files in s3 into our weather staging table. again, we catch error using try and except blocks
query = """
        copy stage.weather from 's3://corteve/wx_data2/' 
        credentials 'aws_iam_role={}' 
        region 'us-west-2' 
        delimiter ' '
        dateformat AS 'YYYY/MM/DD'
        """.format(DWH_ROLE_ARN)
try:
    cur.execute(query)
    conn.commit()
    print('load data successfully')
except Exception as e:
    print('cant load txt file into redshift', '\n', e)
    conn.rollback()

load data successfully


In [19]:
#we copy all yld_data files into our yield staging table
query = """
        copy stage.yield from 's3://corteve/yld_data/' 
        credentials 'aws_iam_role={}' 
        region 'us-west-2' 
        delimiter '\t'
        """.format(DWH_ROLE_ARN)
try:
    cur.execute(query)
    conn.commit()
    print('load data successfully')
except Exception as e:
    print('cant load txt file into redshift', '\n', e)
    conn.rollback()

load data successfully


In [20]:
## lets find top 5 duplicates in our tables
q = """select distinct date,
		count(*) as dup
from stage.weather
group by date, max_temp, min_temp, precipitation
order by 2 desc 
limit 5"""

%sql $q

 * postgresql://dwhuser:***@dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com:5439/dwh
5 rows affected.


date,dup
19920106,16
19861227,12
20071008,11
19950813,11
19990705,10


#### lets start inserting into dev while removing duplicates

In [21]:
#we insert into dev, remove duplicates, and we time our ingestion process as per request
start_time = timeit.timeit()
insert_into_weather = """insert into dev.weather (date, max_temp, min_temp, precipitation, weather_station) 
                            select distinct s.date,
                                s.max_temp,
                                s.min_temp, 
                                s.precipitation,
                                case 
                                     when weather_station = 'USC0011' then 'Nebraska' 
                                     when weather_station = 'USC0012' then 'Iowa' 
                                     when weather_station = 'USC0013' then 'Illinois' 
                                     when weather_station = 'USC0025' then 'Indiana' 
                                     when weather_station = 'USC0033' then 'Ohio' 
                                 end as weather_station
                                 
                        from stage.weather as s
                        group by 1,2,3,4,5
                    """  
try:
    cur.execute(insert_into_weather)
    conn.commit()
    print('data inserted successfully')
except Exception as e:
    print('cant insert data', '\n', e)
    conn.rollback()
    
end_time = timeit.timeit()

data inserted successfully


In [23]:
# we output log as requested
cur.execute('select count(*) from dev.weather')
records = cur.fetchall()[0][0]
runtime = round(end_time - start_time,5)
print('here is the time taken for ingestion in millisecs for dev.weather table: ', runtime)
print(records, 'records were inserted')

here is the time taken for ingestion in millisecs for dev.weather table:  0.00167
1618131 records were inserted


In [24]:
#we repeat the same for yield dev table
start_time = timeit.timeit()

insert_into_yield = """insert into dev.yield (year, total_harvest) 
                            select distinct year,
                                total_harvest
                        from stage.yield
                        group by 1,2
                    """  
try:
    cur.execute(insert_into_yield)
    conn.commit()
    print('data inserted successfully')
except Exception as e:
    print('cant insert data', '\n', e)
    conn.rollback()
    
end_time = timeit.timeit()

data inserted successfully


In [25]:
#log output
cur.execute('select count(*) from dev.yield')
records = cur.fetchall()[0][0]
runtime = round(end_time - start_time,5)
print('here is the time taken for ingestion in millisecs for dev.yield table: ', runtime)
print(records, 'records were inserted')

here is the time taken for ingestion in millisecs for dev.weather table:  0.00604
30 records were inserted


#### Analysis part

In [26]:
#Average maximum temperature (in degrees Celsius)

q = """select distinct(cast(left(cast(date as text),4) as int)) as year,
        weather_station,
        round(avg(max_temp/10.0),2) as average_max_temp
        from dev.weather
        where max_temp <> -9999
        group by 1,2
        order by 1,2,3
        limit 10
        """
%sql $q

 * postgresql://dwhuser:***@dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com:5439/dwh
10 rows affected.


year,weather_station,average_max_temp
1985,Illinois,14.28
1985,Indiana,15.7
1985,Iowa,16.5
1985,Nebraska,16.15
1985,Ohio,15.99
1986,Illinois,15.01
1986,Indiana,17.6
1986,Iowa,17.35
1986,Nebraska,17.32
1986,Ohio,16.43


In [27]:
#Average minimum temperature (in degrees Celsius)
q = """select distinct(cast(left(cast(date as text),4) as int)) as year,
        weather_station,
        round(avg(min_temp/10.0),2) as average_min_temp
        from dev.weather
        where min_temp <> -9999
        group by 1,2
        order by 1,2,3
        limit 10
        """
%sql $q

 * postgresql://dwhuser:***@dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com:5439/dwh
10 rows affected.


year,weather_station,average_min_temp
1985,Illinois,2.39
1985,Indiana,2.22
1985,Iowa,5.13
1985,Nebraska,4.69
1985,Ohio,4.32
1986,Illinois,4.09
1986,Indiana,4.07
1986,Iowa,6.32
1986,Nebraska,5.93
1986,Ohio,5.32


In [28]:
#Total accumulated precipitation (in centimeters)
q = """select distinct(cast(left(cast(date as text),4) as int)) as year,
        weather_station,
        round(sum(precipitation/10.0),2) as total_precipitation
        from dev.weather
        where precipitation <> -9999
        group by 1,2
        order by 1,2,3
        limit 10
        """
%sql $q

 * postgresql://dwhuser:***@dwhcluster.cdpkfsp84e6y.us-west-2.redshift.amazonaws.com:5439/dwh
10 rows affected.


year,weather_station,total_precipitation
1985,Illinois,18388.1
1985,Indiana,27766.9
1985,Iowa,42818.2
1985,Nebraska,40365.6
1985,Ohio,25133.7
1986,Illinois,22759.9
1986,Indiana,34250.6
1986,Iowa,36059.7
1986,Nebraska,30398.4
1986,Ohio,24907.3


#### design data model to ingest our analytics result to

In [29]:
#we create our tables for the analytics results
create_max_temp_table_dev = """CREATE TABLE IF NOT EXISTS dev.max_temp (id int PRIMARY KEY IDENTITY(1,1),
                   year smallint NOT NULL SORTKEY,
                   weather_station varchar(10),
                   avg_max_temp float(4)
                    )"""

create_min_temp_table_dev = """CREATE TABLE IF NOT EXISTS dev.min_temp (id int PRIMARY KEY IDENTITY(1,1),
                   year smallint NOT NULL SORTKEY,
                   weather_station varchar(10),
                   avg_min_temp float(4)
                    )"""

create_total_precip_table_dev = """CREATE TABLE IF NOT EXISTS dev.total_precip (id int PRIMARY KEY IDENTITY(1,1),
                   year smallint NOT NULL SORTKEY,
                   weather_station varchar(10),
                   total_precipitation float(4)
                    )"""

In [30]:
#we execute our table creation query
try:
    cur.execute(create_max_temp_table_dev)
    cur.execute(create_min_temp_table_dev)
    cur.execute(create_total_precip_table_dev)
    conn.commit()
    print("tables successfully created")
except Exception as e:
    print('an error occured while creating tables', '\n', e)
    conn.rollback() #we undo any transaction that must have been done earlier to prevent InFailedSqlTransaction error

tables successfully created


In [31]:
#insert into our max_temp table
insert_into_max_temp = """insert into dev.max_temp (year, weather_station, avg_max_temp) 
                            select distinct(cast(left(cast(date as text),4) as int)) as year,
                                    weather_station,
                                    round(avg(max_temp/10.0),2) as average_max_temp
                            from dev.weather
                            where max_temp <> -9999
                            group by 1,2
                    """  
try:
    cur.execute(insert_into_max_temp)
    conn.commit()
    print('data inserted successfully')
except Exception as e:
    print('cant insert data', '\n', e)
    conn.rollback()

data inserted successfully


In [32]:
#insert into our min_temp table

insert_into_min_temp = """insert into dev.min_temp (year, weather_station, avg_min_temp) 
                            select distinct(cast(left(cast(date as text),4) as int)) as year,
                                    weather_station,
                                    round(avg(min_temp/10.0),2) as average_min_temp
                            from dev.weather
                            where min_temp <> -9999
                            group by 1,2
                    """  
try:
    cur.execute(insert_into_min_temp)
    conn.commit()
    print('data inserted successfully')
except Exception as e:
    print('cant insert data', '\n', e)
    conn.rollback()

data inserted successfully


In [33]:
#insert into our total_precip table
insert_into_total_precip = """insert into dev.total_precip (year, weather_station, total_precipitation) 
                            select distinct(cast(left(cast(date as text),4) as int)) as year,
                                    weather_station,
                                    round(sum(precipitation/10.0),2) as total_precipitation
                                    from dev.weather
                                    where precipitation <> -9999
                                    group by 1,2
                                """  
try:
    cur.execute(insert_into_total_precip)
    conn.commit()
    print('data inserted successfully')
except Exception as e:
    print('cant insert data', '\n', e)
    conn.rollback()

data inserted successfully


In [None]:
###comments by me.

#the file was reprocessed with the assumption that the first 6 characters of the filename indicates where it came from.
#i uploaded a fileprocessor.py which reprocessed the file to github as  well.
#for instance, the file USC00110072.txt first 7 characters('USC0011') came  from 'Nebraska' 

#i wasnt given enough time to answer the last question. I have to research, and learn a bit of django or flask,  and rest api
#to annswer that part. I'm currently on it.

#this is just a rough work, there's plenty of room for improvement. There are repetitions that cud be avoided via using functions

#this project was done in aws redshift, which is just a modified postgress