# Data Sources

_Orion_ is Peak's go to framework for reading, writing & transferring data across various sources.

In this tutorial we will go over some of the most foundamental operations we perform at Peak, such as writing data to/from Redshift/S3/AIS.

## 1. Reading Data In

### 1.1 S3

In [1]:
from orion.sources import S3Source

In [2]:
weather = S3Source(
    bucket='kilimanjaro-prod-datalake', 
    key='newstarter/uploads/weather/1581525070376_Peak_weather.csv'
).read_csv()
weather.head(5)

Unnamed: 0,origin,year,month,day,hour,temp,dewp,humid,wind_dir,wind_speed,wind_gust,precip,pressure,visib,time_hour
0,EWR,2013,1,1,1,39.02,26.06,59.37,270.0,10.35702,,0.0,1012.0,10.0,2013-01-01T06:00:00Z
1,EWR,2013,1,1,2,39.02,26.96,61.63,250.0,8.05546,,0.0,1012.3,10.0,2013-01-01T07:00:00Z
2,EWR,2013,1,1,3,39.02,28.04,64.43,240.0,11.5078,,0.0,1012.5,10.0,2013-01-01T08:00:00Z
3,EWR,2013,1,1,4,39.92,28.04,62.21,250.0,12.65858,,0.0,1012.2,10.0,2013-01-01T09:00:00Z
4,EWR,2013,1,1,5,39.02,28.04,64.43,260.0,12.65858,,0.0,1011.9,10.0,2013-01-01T10:00:00Z


### 1.2 Redshift

Unlike S3, Redshift requires access to certain environment variables before any read/write operation can be executed. We pull those environment variables into our notebook using _load_env()_ function. 

In [3]:
from orion.contrib.envs import load_env
load_env()

True

In [4]:
from orion.sources import RedshiftSource

### 1.2.1 Using Query

In [5]:
sql_query = """
    select 
        *
    from
        stage.weather
    """

weather = RedshiftSource(query=sql_query).read_csv()
weather.head(5)

Unnamed: 0,origin,year,month,day,hour,temp,dewp,humid,wind_dir,wind_speed,wind_gust,precip,pressure,visib,time_hour
0,EWR,2013,1,1,5,39.02,28.04,64.43,260,12.65858,,0,1011.9,10,2013-01-01T10:00:00Z
1,EWR,2013,1,2,14,33.98,10.94,37.86,310,17.261699999999998,26.46794,0,1017.8,10,2013-01-02T19:00:00Z
2,EWR,2013,1,3,22,28.94,17.06,60.69,230,10.35702,,0,1019.0,10,2013-01-04T03:00:00Z
3,EWR,2013,1,5,6,33.08,17.96,53.36,250,10.35702,,0,1022.4,10,2013-01-05T11:00:00Z
4,EWR,2013,1,6,14,48.02,24.98,40.22,250,14.96014,18.41248,0,1015.1,10,2013-01-06T19:00:00Z


### 1.2.2 Using File

In [6]:
sql_file = "resources/weather.sql"

weather =  RedshiftSource(query_file=sql_file).read_csv()
weather.head(5)

Unnamed: 0,origin,year,month,day,hour,temp,dewp,humid,wind_dir,wind_speed,wind_gust,precip,pressure,visib,time_hour
0,EWR,2013,1,1,13,39.2,28.4,69.67,330,16.11092,,0,,10,2013-01-01T18:00:00Z
1,EWR,2013,1,2,21,30.02,14.0,50.84,270,5.7539,,0,1021.2,10,2013-01-03T02:00:00Z
2,EWR,2013,1,4,5,30.92,21.02,66.34,240,11.5078,,0,1018.4,10,2013-01-04T10:00:00Z
3,EWR,2013,1,5,13,44.06,19.94,37.79,290,11.5078,19.56326,0,1023.6,10,2013-01-05T18:00:00Z
4,EWR,2013,1,6,21,42.08,26.06,52.73,270,10.35702,,0,1016.7,10,2013-01-07T02:00:00Z


## 2. Writing Data Out 

### 2.1 S3

In [7]:
from orion.sources import S3Source

In [8]:
S3Source(
    bucket="kilimanjaro-prod-datalake", 
    key="newstarter/datascience/weather.csv"
).write_csv(weather, index=False)

### 2.2 Redshift

In order to write to Redshift you need special environment variable called _Redshift IAM Role_.

To get this variable you will have to make a support ticket (https://peak-bi.atlassian.net/servicedesk/customer/portals).

Once you receive it from DevOps team member, you will need to do the following:

1. Click on the "+" icon in the top left corner.
2. Click on Terminal.
3. Type in "cd ~/" to make sure you are in the user root directory.
4. Type in "nano .env".
5. At the bottom of the file enter: "export REDSHIFT_IAM_ROLE=redshift-iam-role". <br>
    5.1 Where "redshift-iam-role" is the _Redshift IAM Role_.
6. Save changes (CTRL+X > "Y" > ENTER).

#### 2.2.1 Existing Table

In [9]:
from orion.contrib.envs import load_env
load_env()

True

In [10]:
from orion.sources import RedshiftSource

We create fake data sample to how a new data sample can be appending to an existing table.

In [11]:
import numpy as np
import pandas as pd

from datetime import datetime

In [12]:
new_sample = pd.DataFrame([
    {
        'origin':np.random.choice(weather['origin']),
        'year': datetime.now().year,
        'month': datetime.now().month,
        'day': datetime.now().day,
        'hour': datetime.now().hour,
        'temp': np.random.choice(weather['temp']),
        'dewp': np.random.choice(weather['dewp']),
        'humid': np.random.choice(weather['humid']),
        'wind_dir': np.random.choice(weather['wind_dir']),
        'wind_speed': np.random.choice(weather['wind_speed']),
        'wind_gust': np.random.choice(weather['wind_gust']),
        'precip': np.random.choice(weather['precip']),
        'pressure': np.random.choice(weather['pressure']),
        'visib': np.random.choice(weather['visib']),
        'time_hour': datetime.now().strftime(format="%Y-%m-%dT%H:%M:%S%Z")
    }
], columns=weather.columns)

new_sample.head()

Unnamed: 0,origin,year,month,day,hour,temp,dewp,humid,wind_dir,wind_speed,wind_gust,precip,pressure,visib,time_hour
0,JFK,2020,2,27,17,66.92,62.06,84.92,,8.05546,,0,1020.7,10,2020-02-27T17:01:42


In [13]:
RedshiftSource(schema='stage', table='weather').write_csv(new_sample, index=False)

#### 2.2.2 New Table

_Orion_ will automatically create a new table according to provided DF's schema if the 'table' parameter does not match to the existing table.

`overwrite=True` flag will truncate the table and copy the contents of DF into it.

In [14]:
RedshiftSource(table="new_weather", schema="stage", overwrite=True).write_csv(weather, index=False)

### 2.3 AIS

Make sure to study Notebook **4. AIS API KEY** before proceeding.

In [15]:
import os

In [16]:
from orion.contrib.envs import load_env
load_env()

True

In [17]:
from orion.sources import AISSource

In [18]:
AISSource(
    target="weather/weather_20200227.csv", 
    key=os.environ['AIS_API_KEY'], 
    token=os.environ['AIS_API_KEY']
).write_csv(weather, index=False)

You can verify success of the upload by going to _AIS > Outcomes > Downloads_