In [4]:
import boto3
import configparser
import time
import glob
import os
import pandas as pd
import json
import pyarrow.parquet as pq

# Loading all Params from dwh.cfh file

In [13]:
config = configparser.ConfigParser()
config.read('dwh.cfg')
KEY = config['AWS']['KEY']
SECRET = config['AWS']['SECRET']
REGION_NAME = config['AWS']['REGION_NAME']

DWH_CLUSTER_TYPE = config['DWH']['DWH_CLUSTER_TYPE']
DWH_NUM_NODES = config['DWH']['DWH_NUM_NODES']
DWH_NODE_TYPE = config['DWH']['DWH_NODE_TYPE']
DWH_IAM_ROLE_NAME = config['DWH']['DWH_IAM_ROLE_NAME']
DWH_CLUSTER_IDENTIFIER = config['DWH']['DWH_CLUSTER_IDENTIFIER']
DWH_DB = config['DWH']['DWH_DB']
DWH_DB_USER = config['DWH']['DWH_DB_USER']
DWH_DB_PASSWORD = config['DWH']['DWH_DB_PASSWORD']
DWH_PORT = config['DWH']['DWH_PORT']

pd.DataFrame({"Param":
              ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
            [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
})

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,t2.micro
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,test_iam_role


# Creating clients for S3, EC2, IAM, redshift

In [14]:
# create IAM client
iam  = boto3.client('iam',aws_access_key_id = KEY, aws_secret_access_key = SECRET, region_name = REGION_NAME )

# create s3 client
s3  = boto3.client('s3',aws_access_key_id = KEY, aws_secret_access_key = SECRET, region_name = REGION_NAME )
# s3_rsc  = boto3.resource('s3',aws_access_key_id = KEY, aws_secret_access_key = SECRET, region_name = REGION_NAME )

#create EC2 client
ec2  = boto3.client('ec2',aws_access_key_id = KEY, aws_secret_access_key = SECRET, region_name = REGION_NAME )

# create redshift client
redshift  = boto3.client('redshift',aws_access_key_id = KEY, aws_secret_access_key = SECRET, region_name = REGION_NAME )

# Create IAM role that makes Redshift able to access S3 buckets

In [15]:
try:
  print('create iam role')
  dwhRole = iam.create_role(
     Path= '/',
     RoleName = DWH_IAM_ROLE_NAME,
     Description = 'create role iam for project ',
     AssumeRolePolicyDocument = json.dumps(
        {'Statement' : [{'Action': 'sts:AssumeRole',
                         'Effect': 'Allow',
                         'Principal': {'Service': 'redshift.amazonaws.com'}}],
          'Version': '2012-10-17'})
                         )
except Exception as e:
  print(e)

print("Attaching Policy")

try:
    iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']
    print("Get the IAM role ARN")
    roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
    print(roleArn)
except Exception as e:
    print(e)

create iam role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name test_iam_role already exists.
Attaching Policy
Get the IAM role ARN
arn:aws:iam::590183675741:role/test_iam_role


# create s3 bucket unprocessed 

In [16]:
# def bucket_s3_exists(s3_rsc, b):
#   s3_rsc = boto3.resource('s3')
#   return s3_rsc.Bucket(b) in s3_rsc.buckets.all()

def create_s3_bucket(s3, b, folders):
  s3.create_bucket(Bucket = b, CreateBucketConfiguration={'LocationConstraint': 'ap-southeast-2'})
  if folders !=  ' ':
    fls = folders.split(',')
    for f in fls:
      s3.put_object(Bucket = b, Body = ' ', Key = f + '/')

tt = time.time_ns()
prefix = f'ny-taxi-bucket-s3-{tt}'
bucket_names = {prefix: 'unprocessed_reports, rides, locations'}

# creating s3 buckets.....................................
for k in bucket_names:
  create_s3_bucket(s3, k, bucket_names[k])

print('S3 buckets created')

S3 buckets created


In [6]:
path_data_yellow = r'D:\DE\ny_taxi_etl_aws\ny_taxi\yellow_tripdata_2024-01.parquet'
# check metadata
print("metadata")
print(pq.read_metadata(path_data_yellow))

metadata
<pyarrow._parquet.FileMetaData object at 0x0000024A134D0F90>
  created_by: parquet-cpp-arrow version 14.0.2
  num_columns: 19
  num_rows: 2964624
  num_row_groups: 3
  format_version: 2.6
  serialized_size: 6357


In [14]:
file = pq.ParquetFile(path_data_yellow)
table = file.read()
table.schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double

In [15]:
batches_iter = file.iter_batches(batch_size=10)
df = next(batches_iter).to_pandas()
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1,1.72,1,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1,1.8,1,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1,4.7,1,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1,1.4,1,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1,0.8,1,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0
5,1,2024-01-01 00:54:08,2024-01-01 01:26:31,1,4.7,1,N,148,141,1,29.6,3.5,0.5,6.9,0.0,1.0,41.5,2.5,0.0
6,2,2024-01-01 00:49:44,2024-01-01 01:15:47,2,10.82,1,N,138,181,1,45.7,6.0,0.5,10.0,0.0,1.0,64.95,0.0,1.75
7,1,2024-01-01 00:30:40,2024-01-01 00:58:40,0,3.0,1,N,246,231,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0
8,2,2024-01-01 00:26:01,2024-01-01 00:54:12,1,5.44,1,N,161,261,2,31.0,1.0,0.5,0.0,0.0,1.0,36.0,2.5,0.0
9,2,2024-01-01 00:28:08,2024-01-01 00:29:16,1,0.04,1,N,113,113,2,3.0,1.0,0.5,0.0,0.0,1.0,8.0,2.5,0.0


In [18]:
batches_iter = file.iter_batches(batch_size=1000)
df = next(batches_iter).to_pandas()
df
df.to_csv('test_yellow_trip.csv', index=True)

data =r'D:\DE\ny_taxi_etl_aws\ny_taxi\taxi_zone_lookup.csv'
# Setting the id column as the index
airbnb_data = pd.read_csv(data)
# airbnb_data = pd.read_csv("data/listings_austing.csv", index_col=0)

# Preview first 5 rows
airbnb_data.head(800)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
...,...,...,...,...
260,261,Manhattan,World Trade Center,Yellow Zone
261,262,Manhattan,Yorkville East,Yellow Zone
262,263,Manhattan,Yorkville West,Yellow Zone
263,264,Unknown,,


# upload file to s3 

In [26]:
def upload_files_to_s3(s3, file_name, b, folder, object_name, args=None):
  if object_name is None:
    object_name = folder + "/{fname}".format(fname = os.path.basename(file_name))

  response = s3.upload_file(file, b, object_name, ExtraArgs=None)
  return response

  # t_start = time.time()
  # count = 0
  # for batch in file.iter_batches(batch_size=100):
  #   count += 1
  #   batch_df = batch.to_pandas()
  #   print(f'inserting batch {count}...')
  #   b_start = time.time()
  #   response = s3.upload_file(batch_df, b, object_name, ExtraArgs=None)
  #   print(response)
  #   b_end = time.time()
  #   print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
  # t_end = time.time()
  # print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')


print('Uploading the local receipts files to ny-taxi-bciket-s3 AWS S3 bucket...')
file = 'test_yellow_trip.csv'
print('upload_file:', file)
print(upload_files_to_s3(s3, file, prefix, 'unprocessed_reports', None))

print(f'Files uploaded to {prefix} AWS S3 bucket')
print(f'ID: {tt}')

Uploading the local receipts files to ny-taxi-bciket-s3 AWS S3 bucket...
upload_file: test_yellow_trip.csv
None
Files uploaded to ny-taxi-bucket-s3-1736431835735791800 AWS S3 bucket
ID: 1736431835735791800


In [None]:
# ny-taxi-bucket-s3-1736431835735791800

# Creating redshift cluster

In [51]:
try:
  response = redshift.create_cluster(
    ClusterType = DWH_CLUSTER_TYPE,
    NodeType = DWH_NODE_TYPE,
    NumberOfNodes = int(DWH_NUM_NODES),

# <-------------------------------------------------------------->

  DBName = DWH_DB,
  ClusterIdentifier = DWH_CLUSTER_IDENTIFIER,
  MasterUsername = DWH_DB_USER,
  MasterUserPassword = DWH_DB_PASSWORD,


  IamRoles = [roleArn]
  )
except Exception as e:
  print(e)

An error occurred (InvalidParameterValue) when calling the CreateCluster operation: Invalid node type: t2.micro


In [52]:

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)