<a href="https://colab.research.google.com/github/ebo7/data_pipeline/blob/main/E2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Automating Weekly Data Ingenstion from GHR

# TASK 1 & 2

## Resources
Following resources were used to emulate the problem setup:
1. AWS Transfer Family - SFTP Server,
 * server id: s-1dab728b35294a138 \\
 * server hosts weekly data on s3://revelio-labs-demo/sftp 
2. Persistent EBS Volume
 * Volume ID: vol-0fbb4f932e366ade5
3. AWS Data Pipeline 
4. S3 bucket
 * S3 URI: s3://revelio-labs-demo \\
 * ADP ingests data from s3://revelio-labs-demo/new \\
 * After ADP transfers the data to Redshift, it is moved to s3://revelio-labs-demo/old, which stores all previous data
5. Redshift cluster - Cluster ID: revelio-labs-redshift-cluster

(ec2 instance was automatically started and terminated by the AWS Data Pipeline)


## Overview
A. Manually create tables and view on Redshift

B. AWS Data Pipeline (After step A)
1. Start EC2
2. Attach and Mount Persistent Elastic Block Store (EBS), which stores private key for sftp
3. Mount GHR directory on EC2 using sshfs
4. Copy directories that were modified within the past week to s3://revelio-labs-demo/new/
5. Copy the new data to Redshift

For the first week only, step B script is modified to include all directories.

# Step A: Create tables, view

In [None]:
# master
CREATE TABLE master(job_id CHAR(9) PRIMARY KEY DISTKEY SORTKEY , company VARCHAR(150), post_date TIMESTAMP, salary INTEGER, city VARCHAR(100))
# title
CREATE TABLE title(job_id CHAR(9) PRIMARY KEY DISTKEY SORTKEY , title VARCHAR(100))
# timelog
CREATE TABLE timelog(job_id CHAR(9) PRIMARY KEY DISTKEY SORTKEY , remove_date TIMESTAMP)

# view
# view was assumed to be based on master
CREATE VIEW posting_current AS
(
  SELECT master.job_id, city, company, title, salary, post_date, remove_date
  FROM master
  LEFT JOIN title on master.job_id = title.job_id
  LEFT JOIN timelog on master.job_id = timelog.job_id
)

## Step B: AWS Data Pipeline

![alt](https://drive.google.com/uc?export=view&id=1pCwRuAzOddvlaCBRbL9my_8kGGs_osGk)
(Detailed)

![alt](https://drive.google.com/uc?export=view&id=1WEl0rtSzfBOrWsYyaamRKvj1TK1Oymnx)
(Brief) \\
>* Shell 1: Copy new data from GHR to Revelio Labs S3
* Redshift: Enter data into tables
* Shell 2: Cleanup


### Shell 1 (Amazon Linux)
Default ec2 for ADP is Amazon Linux

In [None]:
# Install sshfs
sudo yum-config-manager --enable epel
sudo yum -y install sshfs


# Attach EBS Volume
aws configure set default.region us-east-1
INSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id/)
VOL_ID=vol-0fbb4f932e366ade5
aws ec2 attach-volume --device /dev/sdf --volume-id $VOL_ID --instance-id $INSTANCE_ID
# Block until attachment is finished
while [ ! -b $(readlink -f /dev/xvdf) ]
  do echo "waiting for device /dev/xvdf" 
  sleep 5
done
# Mount EBS Volume
sudo mkdir /root/ebs-mnt
sudo mount /dev/sdf /root/ebs-mnt


# Mount GHR directory
USER=demo
ID_PATH=/root/ebs-mnt/private-key
SERVER=s-1dab728b35294a138.server.transfer.us-east-1.amazonaws.com:
mkdir ghr-mnt
sudo chmod 400 $ID_PATH
#install sshfs
# sudo apt-get update
# sudo apt-get install sshfs
sudo sshfs -o StrictHostKeyChecking=no,allow_other,IdentityFile=$ID_PATH $USER@$SERVER ./ghr-mnt


# Copy new directories to s3/old; keep the GHR directory structure
BUCKET=s3://revelio-labs-demo
cd ghr-mnt
find . -mindepth 1 -type d -mtime -7 | cut -c3- | while read line; do aws s3 cp --recursive $line $BUCKET/old/$line; done


# Copy new directories to S3/new, which is ingested by ADP. Directory structure is modified for ADP
find . -mtime -7 -type f -regex '\./.*/master.*' | while read line; do aws s3 cp $line $BUCKET/new/master/; done
find . -mtime -7 -type f -regex '\./.*/timelog.*' | while read line; do aws s3 cp $line $BUCKET/new/timelog/; done
find . -mtime -7 -type f -regex '\./.*/title.*' | while read line; do aws s3 cp $line $BUCKET/new/title/; done

# IMPROVE: Copy directories only once to s3/new and after ADP ingestion move to s3/old 

### Redshift Copy
Redshift Copy takes:
* input(S3) & output (Redshift table) \\
* command \\
* insert mode \\

Redshift Copy is handled by ADP and the details can be found in JSON at the end.

### Shell Command 2

In [None]:
# clean up
BUCKET=s3://revelio-labs-demo
INSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id/)
VOL_ID=vol-0fbb4f932e366ade5
# remove new files
aws s3 rm --recursive $BUCKET/new
cd ..
# unmount ghr and ebs
sudo umount ghr-mnt
sudo umount /root/ebs-mnt
aws ec2 detach-volume --device /dev/sdf --volume-id $VOL_ID --instance-id $INSTANCE_ID



## Choosing between AWS Services
* Running ADP would start ec2 instances on demand weekly and could reduce cost
* With temporary ec2, private-key is stored outside the ec2's instance storage and 2 exterior stroage options can be s3 and s3:
 1. Option 1: Copy from S3: Repeatedly copying private-key from s3 to ec2 might have security risks. 
 2. Option 2: Mount s3 virtually with s3fs: ghr directory is being mounted with sshfs and requires IdentityFile. There was an issue with providing private-key from a mounted s3 filesystem and needed more exploration to debug. 
 3. Option 3: IdentityFile on persistent ebs volume was compatible for mounting ghr directory on ec2.

# TASK 3

In [None]:
# create temporary table listing all months as int
CREATE OR REPLACE PROCEDURE p1()AS $$
DECLARE
  loop_var int;
  loop_end int:=CAST(FLOOR(MONTHS_BETWEEN(GETDATE(), '2014-01-01')) AS INTEGER);
BEGIN
  DROP TABLE if exists months;
  CREATE  TEMP TABLE months(month int);
    FOR loop_var IN 0..loop_end LOOP
        insert into months values(CAST(SUBSTRING(DATEADD(month, loop_var,'2014-01-01'),1,4) + SUBSTRING(DATEADD(month, loop_var,'2014-01-01'), 6,2) AS INTEGER));
    END LOOP;
END;
$$ LANGUAGE plpgsql;

CALL p1();

# checked if the output value is 0 or null and value that is 0 and not null is displayed as empty on redshift.
# can force the int 0 to show as char '0' when sum=0 and sum!=null 
SELECT 
	SUBSTRING(CAST(month AS CHAR(6)),1,4)+'-'+SUBSTRING(CAST(month AS CHAR(6)),5,2) AS month,
	SUM(is_new) count_new,
  SUM(is_removed) count_removed,
  SUM(is_active) count_active,
  AVG(salary_new) salary_new,
  AVG(salary_removed) salary_removed,
  AVG(salary_active) salary_active
FROM
(
  SELECT 
      month,
      post_date,
      CASE WHEN CAST(SUBSTRING(post_date,1,4) + SUBSTRING(post_date, 6,2) AS INTEGER)=month 
                THEN 1
                ELSE 0 
      END AS is_new,
      CASE WHEN CAST(SUBSTRING(remove_date,1,4) + SUBSTRING(remove_date, 6,2) AS INTEGER)=month 
                THEN 1
                ELSE 0 
      END AS is_removed,
      CASE WHEN (CAST(SUBSTRING(remove_date,1,4) + SUBSTRING(remove_date, 6,2) AS INTEGER)>=month AND CAST(SUBSTRING(post_date,1,4) + SUBSTRING(post_date, 6,2) AS INTEGER)<=month) 
                THEN 1
                ELSE 0 
      END AS is_active,
      CASE WHEN is_new=1
          THEN salary
          ELSE null
      END AS salary_new,
      CASE WHEN is_removed=1
          THEN salary
          ELSE null
      END AS salary_removed,
      CASE WHEN is_active=1
          THEN salary
          ELSE null
      END AS salary_active
  FROM months, posting_current
)
GROUP BY month


## ADP JSON

In [None]:
{
  "objects": [
    {
      "directoryPath": "s3://revelio-labs-demo/new/timelog",
      "name": "s3 - timelog",
      "id": "DataNodeId_h710s",
      "type": "S3DataNode"
    },
    {
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "name": "ec2",
      "id": "ResourceId_bc0wq",
      "type": "Ec2Resource",
      "availabilityZone": "us-east-1b"
    },
    {
      "database": {
        "ref": "RedshiftDatabaseId_mT4AA"
      },
      "name": "table - timelog",
      "id": "DataNodeId_TFhdv",
      "type": "RedshiftDataNode",
      "tableName": "timelog"
    },
    {
      "output": {
        "ref": "DataNodeId_cWFVd"
      },
      "input": {
        "ref": "DataNodeId_spAxU"
      },
      "dependsOn": {
        "ref": "ShellCommandActivityId_RCZNF"
      },
      "name": "Redshift Copy: title",
      "commandOptions": "csv\nignoreheader 1\nACCEPTINVCHARS\nTRUNCATECOLUMNS",
      "runsOn": {
        "ref": "ResourceId_bc0wq"
      },
      "id": "RedshiftCopyActivityId_rBhBZ",
      "type": "RedshiftCopyActivity",
      "insertMode": "OVERWRITE_EXISTING"
    },
    {
      "database": {
        "ref": "RedshiftDatabaseId_mT4AA"
      },
      "name": "table - master",
      "id": "DataNodeId_3KDbt",
      "type": "RedshiftDataNode",
      "tableName": "master"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "Shell 1",
      "runsOn": {
        "ref": "ResourceId_bc0wq"
      },
      "id": "ShellCommandActivityId_RCZNF",
      "type": "ShellCommandActivity",
      "command": "# Install sshfs\nsudo yum-config-manager --enable epel\nsudo yum -y install sshfs\n\n\n# Attach EBS Volume\naws configure set default.region us-east-1\nINSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id/)\nVOL_ID=vol-0fbb4f932e366ade5\naws ec2 attach-volume --device /dev/sdf --volume-id $VOL_ID --instance-id $INSTANCE_ID\n# Block until attachment is finished\nwhile [ ! -b $(readlink -f /dev/xvdf) ]\n  do echo \"waiting for device /dev/xvdf\" \n  sleep 5\ndone\n# Mount EBS Volume\nsudo mkdir /root/ebs-mnt\nsudo mount /dev/sdf /root/ebs-mnt\n\n\n# Mount GHR directory\nUSER=demo\nID_PATH=/root/ebs-mnt/private-key\nSERVER=s-1dab728b35294a138.server.transfer.us-east-1.amazonaws.com:\nmkdir ghr-mnt\nsudo chmod 400 $ID_PATH\n#install sshfs\n# sudo apt-get update\n# sudo apt-get install sshfs\nsudo sshfs -o StrictHostKeyChecking=no,allow_other,IdentityFile=$ID_PATH $USER@$SERVER ./ghr-mnt\n\n\n# Copy new directories to s3/old; keep the GHR directory structure\nBUCKET=s3://revelio-labs-demo\ncd ghr-mnt\nfind . -mindepth 1 -type d -mtime -7 | cut -c3- | while read line; do aws s3 cp --recursive $line $BUCKET/old/$line; done\n\n\n# Copy new directories to S3/new, which is ingested by ADP. Directory structure is modified for ADP\nfind . -mtime -7 -type f -regex '\\./.*/master.*' | while read line; do aws s3 cp $line $BUCKET/new/master/; done\nfind . -mtime -7 -type f -regex '\\./.*/timelog.*' | while read line; do aws s3 cp $line $BUCKET/new/timelog/; done\nfind . -mtime -7 -type f -regex '\\./.*/title.*' | while read line; do aws s3 cp $line $BUCKET/new/title/; done\n\n# IMPROVE: Copy directories only once to s3/new and after ADP ingestion move to s3/old "
    },
    {
      "*password": "Revelio-labs-admin1",
      "name": "DefaultRedshiftDatabase1",
      "id": "RedshiftDatabaseId_mT4AA",
      "clusterId": "revelio-labs-redshift-cluster",
      "type": "RedshiftDatabase",
      "username": "revelio-labs-admin"
    },
    {
      "output": {
        "ref": "DataNodeId_TFhdv"
      },
      "input": {
        "ref": "DataNodeId_h710s"
      },
      "dependsOn": {
        "ref": "ShellCommandActivityId_RCZNF"
      },
      "name": "Redshift Copy: timelog",
      "commandOptions": "csv\nignoreheader 1\nACCEPTINVCHARS\nTRUNCATECOLUMNS",
      "runsOn": {
        "ref": "ResourceId_bc0wq"
      },
      "id": "RedshiftCopyActivityId_yRtTA",
      "type": "RedshiftCopyActivity",
      "insertMode": "OVERWRITE_EXISTING"
    },
    {
      "directoryPath": "s3://revelio-labs-demo/new/master",
      "name": "s3 - master",
      "id": "DataNodeId_SZs4z",
      "type": "S3DataNode"
    },
    {
      "dependsOn": [
        {
          "ref": "RedshiftCopyActivityId_XbujI"
        },
        {
          "ref": "RedshiftCopyActivityId_rBhBZ"
        },
        {
          "ref": "RedshiftCopyActivityId_yRtTA"
        }
      ],
      "name": "Shell 2",
      "runsOn": {
        "ref": "ResourceId_bc0wq"
      },
      "id": "ShellCommandActivityId_OyvMn",
      "type": "ShellCommandActivity",
      "command": "# clean up\nBUCKET=s3://revelio-labs-demo\nINSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id/)\nVOL_ID=vol-0fbb4f932e366ade5\naws s3 rm --recursive $BUCKET/new\ncd ..\nsudo umount ghr-mnt\nsudo umount /root/ebs-mnt\naws ec2 detach-volume --device /dev/sdf --volume-id $VOL_ID --instance-id $INSTANCE_ID\n\n"
    },
    {
      "database": {
        "ref": "RedshiftDatabaseId_mT4AA"
      },
      "name": "table - title",
      "id": "DataNodeId_cWFVd",
      "type": "RedshiftDataNode",
      "tableName": "title"
    },
    {
      "directoryPath": "s3://revelio-labs-demo/new/title",
      "name": "s3 - title",
      "id": "DataNodeId_spAxU",
      "type": "S3DataNode"
    },
    {
      "output": {
        "ref": "DataNodeId_3KDbt"
      },
      "input": {
        "ref": "DataNodeId_SZs4z"
      },
      "dependsOn": {
        "ref": "ShellCommandActivityId_RCZNF"
      },
      "name": "Redshift Copy: master",
      "commandOptions": "csv\nignoreheader 1\nACCEPTINVCHARS\nTRUNCATECOLUMNS",
      "runsOn": {
        "ref": "ResourceId_bc0wq"
      },
      "id": "RedshiftCopyActivityId_XbujI",
      "type": "RedshiftCopyActivity",
      "insertMode": "OVERWRITE_EXISTING"
    }
  ],
  "parameters": []
}