# ----------------------------------------------------------------------------------------------------------
# AWS DATA INGESTION
# ----------------------------------------------------------------------------------------------------------

___

<a> <img src='img\architecture_v2.png' width="1000" /></a>
___

### Import libraries
Make sure you install each of module / library 

* pip install 'module name' on your command prompt

In [1]:
import boto3
import io 
import pandas as pd
import json
import time
import datetime
import numpy as np
import s3fs
import awswrangler
from datetime import datetime
from datetime import timedelta


## 1. Excel / Flat Files Data Sources

### Below is the architecture plan:

___

<a> <img src='img\xl_ingest_pic_v2.png' width="1000" /></a>
___

In [4]:
#define bucket address where your data is placed to be read
bucket_name = 'mst-lab-data'
object_key = 'input/user_007/rawtest2.xlsx'
# s3 = boto3.client('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_secret) #not secured way consider using AWS CLI
s3 = boto3.client('s3') #to call library which enable we talk to aws environment (S3)
obj = s3.get_object(Bucket=bucket_name, Key=object_key) #
obj

{'ResponseMetadata': {'RequestId': 'D9AC0B7764E5F257',
  'HostId': 'ovPbQeaYpKK7hlX3BWSiqn880aML/xDQUyQtAQq8P5naGCpY5ITXCNHkq+v8FRuZZOj8JqWSc6Y=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ovPbQeaYpKK7hlX3BWSiqn880aML/xDQUyQtAQq8P5naGCpY5ITXCNHkq+v8FRuZZOj8JqWSc6Y=',
   'x-amz-request-id': 'D9AC0B7764E5F257',
   'date': 'Wed, 28 Aug 2019 10:22:40 GMT',
   'last-modified': 'Wed, 28 Aug 2019 02:20:27 GMT',
   'etag': '"ccf2c7325f7ab9b9c7976e00f0d5c981"',
   'accept-ranges': 'bytes',
   'content-type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
   'content-length': '9772',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2019, 8, 28, 2, 20, 27, tzinfo=tzutc()),
 'ContentLength': 9772,
 'ETag': '"ccf2c7325f7ab9b9c7976e00f0d5c981"',
 'ContentType': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
 'Metadata': {},
 'Body': <botocore.response.StreamingBody at 0x2498860e24

In [5]:
#define bucket address where your data is placed to be read
bucket_name = 'mst-lab-data'
object_key = 'input/user_007/rawtest2.xlsx'
# s3 = boto3.client('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_secret) #not secured way consider using AWS CLI
s3 = boto3.client('s3') #to call library which enable we talk to aws environment (S3)
obj = s3.get_object(Bucket=bucket_name, Key=object_key) #
data = obj['Body'].read()
df = pd.read_excel(io.BytesIO(data), encoding='utf-8')
df.head()

Unnamed: 0,date,site,prod
0,2019-01-01,x,200
1,2019-01-02,x,100


## Check blank columns and blank rows

Re-read the data

In [7]:
#define bucket address where your data is placed to be read
bucket_name = 'mst-lab-data'
object_key = 'input/user_007/rawtest3.xlsx'
# s3 = boto3.client('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_secret) #not secured way consider using AWS CLI
s3 = boto3.client('s3') #to call library which enable we talk to aws environment (S3)
obj = s3.get_object(Bucket=bucket_name, Key=object_key) #
data = obj['Body'].read()
datarawcheck = pd.read_excel(io.BytesIO(data), encoding='utf-8',sheet_name='input', header=None)
datarawcheck.head()

Unnamed: 0,0,1,2,3
0,date,site,,prod
1,2019-01-01 00:00:00,x,,200
2,2019-01-02 00:00:00,x,,100
3,,,,
4,2019-01-03 00:00:00,x,,400


Blank column checking

In [8]:
cols_skip=[]
# pd.isna(datarawcheck[2])
for i in range(datarawcheck.shape[1]):
        if (((pd.isna(datarawcheck[i])).nunique()) == 1 ) & ((((pd.isna(datarawcheck[i])).unique())[0])==True):
            cols_skip.append(i)
        else:
            continue
cols_skip


[2]

Get non blank columns

In [9]:
cols = [i for i in range(len(datarawcheck.columns)) if i not in cols_skip]
cols

[0, 1, 3]

Blank row checking

In [10]:
rows_skip=[]
for i in range(len(datarawcheck.iloc[i])):
    if (((pd.isna(datarawcheck.iloc[i])).nunique()) == 1 ) & ((((pd.isna(datarawcheck.iloc[i])).unique())[0])==True):
        rows_skip.append(i)
    else:
        continue
rows_skip

[3]

In [11]:
rows=rows_skip

In [12]:
cleandata = pd.read_excel(io.BytesIO(data),
                            sheet_name='input',
                            skiprows=rows,
                            usecols=cols)
cleandata

Unnamed: 0,date,site,prod
0,2019-01-01,x,200
1,2019-01-02,x,100
2,2019-01-03,x,400


### How to generate timestamp for current event
It'll be usefull for us if we want to get the current time's event

In [13]:
dateTimeObj = datetime.now()
print(dateTimeObj)

2019-08-28 17:27:00.338582


### Change it into string so we can use it as the name of data if we want

In [14]:
yr= str(dateTimeObj.year)
mo= str(dateTimeObj.month)
day= str(dateTimeObj.day)
hr= str(dateTimeObj.hour)
mn= str(dateTimeObj.minute)
sc= str(dateTimeObj.second)
up_filename=yr+mo+day+hr+mn+sc+'.csv'
up_filename

'201982817270.csv'

### Add the uploadername and the timestamp they upload

In [15]:
cleandata['timestamp']=dateTimeObj
cleandata['useropluad']=object_key
cleandata

Unnamed: 0,date,site,prod,timestamp,useropluad
0,2019-01-01,x,200,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx
1,2019-01-02,x,100,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx
2,2019-01-03,x,400,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx


### Upload the dataframe into S3 bucket using boto3

In [18]:
# define target bucket to load the data
target_bucket='mst-lab-data'
target_object='output/'+up_filename
csv_buffer = io.StringIO()
cleandata.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource('s3')
s3_resource.Object(target_bucket,target_object ).put(Body=csv_buffer.getvalue());

### Upload the dataframe into S3 bucket using s3fs library (a bit slower)

In [175]:
bytes_to_write = cleandata.to_csv(None,index=False ).encode()
fs = s3fs.S3FileSystem()
with fs.open('s3://blueprint-group-rawdata/subgroup_1-rawdata/robert_raw1_csv/'+up_filename, 'wb') as f:
    f.write(bytes_to_write)

### add date and time features using strftime ( http://strftime.org/ )

In [19]:
cleandata['Year']= pd.to_datetime(cleandata['date']).dt.strftime("%Y")
cleandata['Month']= pd.to_datetime(cleandata['date']).dt.strftime("%m")
# df['Month']=pd.to_numeric(df['Month'])
cleandata['Day']= pd.to_datetime(cleandata['date']).dt.strftime("%d")
# df['Day']=pd.to_numeric(df['Day'])
cleandata.head()

Unnamed: 0,date,site,prod,timestamp,useropluad,Year,Month,Day
0,2019-01-01,x,200,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx,2019,1,1
1,2019-01-02,x,100,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx,2019,1,2
2,2019-01-03,x,400,2019-08-28 17:27:00.338582,input/user_007/rawtest3.xlsx,2019,1,3


## 2. Sharepoint List Data Sources

___

<a> <img src='img\sharepoint_ingest_pic_v2.png' width="1000" /></a>
___

###  Configure setting information (sharepoint address, s3 target bucket, credential)

In [None]:
settings = {
    'URL': 'https://tmtgroup.sharepoint.com/sites/ABMDataPlatform',
    'USERNAME': 'user',
    'PASSWORD': 'password',
    'LIST': 'TR_TRIP',
    'S3_BUCKET': 'abm-data-platform-s3-raw'
}

### Import

In [None]:
from io import StringIO
import json
import pandas as pd
import boto3
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.runtime.utilities.request_options import RequestOptions
from settings import settings

ctxAuth = AuthenticationContext(url=settings['URL'])
if ctxAuth.acquire_token_for_user(username=settings['USERNAME'], password=settings['PASSWORD']):
    ctx = ClientContext(settings['URL'], ctxAuth)

Create initial list bucket to store list data from sharepoint

In [None]:
list_items_all = []

### Function to get field / column on sharepoint list

In [94]:
def sharepoint_get_fields(ctx):
    """Get fields
    Get fields name from a List
    """
    filter = "Hidden eq false and ReadOnlyField eq false and Group eq 'Custom Columns'"
    options = RequestOptions(
        "{0}/_api/lists/getbytitle('TR_TRIP')/fields?$filter={1}".format(settings['URL'], filter))
    options.set_header('Accept', 'application/json')
    options.set_header('Content-Type', 'application/json')
    data = ctx.execute_request_direct(options)
    fields = json.loads(data.content)
    return fields

### Function to get item or row data

In [95]:
def sharepoint_get_item(ctx, url=''):
    """Get items
    Get items from a List
    """
    if url == '':
        url = "{0}/_api/lists/getbytitle('TR_TRIP')/items".format(settings['URL'])
    options = RequestOptions(url)
    options.set_header('Accept', 'application/json')
    options.set_header('Content-Type', 'application/json')
    data = ctx.execute_request_direct(options)
    items = json.loads(data.content)
    for item in items['value']:
        list_items_all.append(item)

    # If list items more than 100 data
    if 'odata.nextLink' in items:
        print('[+] Get next items:', items['odata.nextLink'])
        sharepoint_get_item(ctx, items['odata.nextLink'])
    return list_items_all

### Function to read all data in sharepoint than transform to csv

In [96]:
def sharepoint_to_csv():
    """Export to CSV
    Export items to CSV file
    """
    ctxAuth = AuthenticationContext(url=settings['URL'])
    if ctxAuth.acquire_token_for_user(username=settings['USERNAME'], password=settings['PASSWORD']):
        ctx = ClientContext(settings['URL'], ctxAuth)

        # Populate fields name from List dynamically
        print('[+] Get fields name from a List')
        fieldId = []
        fieldTitle = []
        fields = sharepoint_get_fields(ctx)
        for field in fields['value']:
            fieldId.append(field['EntityPropertyName'])
            fieldTitle.append(field['Title'])

        # Populate data to array
        print('[+] Get items from a List')
        items = sharepoint_get_item(ctx)
        list_items = []
        for item in items:
            list_item = []
            for field in fieldId:
                list_item.append(item[field])
            list_items.append(list_item)

        print('[+] Total items:', len(list_items), ' item(s)')

        # Create pandas dataframe from populated data
        print('[+] Create pandas dataframe to CSV')
        df = pd.DataFrame(list_items, columns=fieldTitle)
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)

        # Save CSV file to S3
        print('[+] Upload CSV to S3')
        object_url = 'bdd/raw/rawcsv_bdd_ds_tr_trip/TR_TRIP.csv'
        s3_resource = boto3.resource('s3')
        s3_resource.Object(settings['S3_BUCKET'], object_url).put(
            Body=csv_buffer.getvalue())
        print('[+] S3 Object URL:', settings['S3_BUCKET'] + object_url)
    else:
        print(ctxAuth.get_last_error())
    return

## 3. SQL DB Data Sources

___

<a> <img src='img\sqldb_ingest_pic_v2.png' width="1000" /></a>
___

## Will be shown on the demo