###### Project: Building an ETL Pipeline for Covid-19 Datasets on AWS using AWS S3, Glue, Athena and Redshift.

###### Project Overview
The project aims to build an ETL Pipeline using AWS data services on Covid-19 datsets publicly available on AWS registry for analytical purposes. To accomplish the project, different AWS services such as AWS S3, AWS Glue, AWS Athena, AWS Redshift, Glue Jobs and Python have been used.

###### Project Structure
    1. Extracting Covid-19 datasets and understanding the data
    2. Preparing data for AWS S3
    3. Crawling table schema using Glue Crawler and Building tables to be used in Athena
    4. Analysing data tables in Athena
    5. Building and drawing data models
    6. Performing required transformation on datasets
    7. Copying data to AWS Redshift using COPY command
    8. And finally deploying the full python script on AWS using Glue Jobs to copy data into Redshift for analytics.
    

![Capture.PNG](attachment:Capture.PNG)

###### Datasets
Datasets used in the project have collected from AWS Public Registry solely for educational puropose.

###### Dataset_1: enigma_jhu
Tracks confirmed COVID-19 cases in provinces, states, and countries across the world with a breakdown to the county level in the US.

###### Dataset_2: nytimes_states
###### Dataset_3 nytimes_counties
Track confirmed cases and deaths in the US by state and county.

###### Dataset_4: covid_testing_states_daily
###### Dataset_5: covid_testing_us_daily
###### Dataset_6: covid_testing_us_total
Track the number of people tested, pending tests, and positive and negative tests for COVID-19.

###### Dataset_7: hospital_beds
Data on hospital beds and their utilization in the US.
 
###### Dataset_8: country_codes
###### Dataset_9: county_populations
###### Dataset_10: us_state_abbreviations
Lookup tables to support visualizations.

In [1]:
import boto3
import pandas as pd
from io import StringIO
import time

In [2]:
AWS_ACCESS_KEY = 'AKIA3MAKVSBDKGEJPUXK'
AWS_SECRET_KEY = '.....'
AWS_REGION = 'us-east-1'
SCHEMA_NAME = 'covid19db'
S3_STAGING_DIR = 's3://covid19-data-bucket/output/'
S3_BUCKET_NAME = 'covid19-data-bucket'
S3_OUTPUT_DIRECTORY = 'output'

In [3]:
athena_client = boto3.client(
                            'athena',
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,
                            )

###### Table Schema Design

![covidDataSchema.drawio.png](attachment:covidDataSchema.drawio.png)

###### Fact and Dimension Table Design

![Dimension_Model.drawio.png](attachment:Dimension_Model.drawio.png)

In [25]:
Dict = {}
def download_and_load_query_results(client:boto3.client,query_response:Dict) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(QueryExecutionId=query_response['QueryExecutionId'])
            break
        except Exception as e:
            if 'not yet finished' in str(e):
                time.sleep(0.001)
            else:
                raise e
    temp_file_location: str = 'athena_query_results.csv'
        
    s3_client = boto3.client(
                            's3',
                            aws_access_key_id = AWS_ACCESS_KEY,
                            aws_secret_access_key = AWS_SECRET_KEY,
                            region_name = AWS_REGION,
                            )
    
    s3_client.download_file(
                            S3_BUCKET_NAME,
                            f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
                            temp_file_location,
                           )
    return pd.read_csv(temp_file_location)

In [5]:
# Querying Table enigma_jhud
response = athena_client.start_query_execution(
                        QueryString='Select * From enigma_jhud',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
enigma_jhud = download_and_load_query_results(athena_client, response)

In [6]:
# Querying Table nytimes_data_in_usa_us_county
response = athena_client.start_query_execution(
                        QueryString='Select * From nytimes_data_in_usa_us_county',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
nytimes_data_in_usa_us_county = download_and_load_query_results(athena_client, response)

In [7]:
# Querying Table nytimes_data_in_usa_us_states
response = athena_client.start_query_execution(
                        QueryString='Select * From nytimes_data_in_usa_us_states',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
nytimes_data_in_usa_us_states = download_and_load_query_results(athena_client, response)

In [8]:
# Querying Table rearc_usa_hospital_beds
response = athena_client.start_query_execution(
                        QueryString='Select * From rearc_usa_hospital_beds',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
rearc_usa_hospital_beds = download_and_load_query_results(athena_client, response)

In [9]:
# Querying Table static_datasets_countrycode
response = athena_client.start_query_execution(
                        QueryString='Select * From static_datasets_countrycode',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
static_datasets_countrycode = download_and_load_query_results(athena_client, response)

In [10]:
# Querying Table static_datasets_countypopulation
response = athena_client.start_query_execution(
                        QueryString='Select * From static_datasets_countypopulation',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
static_datasets_countypop = download_and_load_query_results(athena_client, response)

In [11]:
# Querying Table static_datasets_state_abv
response = athena_client.start_query_execution(
                        QueryString='Select * From static_datasets_state_abv',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
static_datasets_state_abv = download_and_load_query_results(athena_client, response)

In [12]:
# Querying Table testing_data_states_daily
response = athena_client.start_query_execution(
                        QueryString='Select * From testing_data_states_daily',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
testing_data_states_daily = download_and_load_query_results(athena_client, response)

In [13]:
# Querying Table testing_data_us_daily
response = athena_client.start_query_execution(
                        QueryString='Select * From testing_data_us_daily',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
testing_data_us_daily = download_and_load_query_results(athena_client, response)

In [14]:
# Querying Table testing_data_us_total_latest
response = athena_client.start_query_execution(
                        QueryString='Select * From testing_data_us_total_latest',
                        QueryExecutionContext={'Database': SCHEMA_NAME},
                        ResultConfiguration={
                            'OutputLocation': S3_STAGING_DIR,
                            'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
                        }
                    )
testing_data_us_total_latest = download_and_load_query_results(athena_client, response)

In [20]:
fact_cobid_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'confirmed', 'deaths', 'recovered', 'active']]
fact_covid_2 = testing_data_states_daily[['fips', 'date', 'positive', 'negative', 'hospitalizedcurrently', 
                                         'hospitalized', 'hospitalizeddischarged']]

factCovid = pd.merge(fact_cobid_1, fact_covid_2, on = 'fips', how = 'inner')

In [21]:
factCovid.head()

Unnamed: 0,fips,province_state,country_region,confirmed,deaths,recovered,active,date,positive,negative,hospitalizedcurrently,hospitalized,hospitalizeddischarged
0,,,Austria,1646.0,4.0,9.0,,20210119,289939,,1066.0,,
1,,,Norway,1550.0,6.0,1.0,,20210119,289939,,1066.0,,
2,,,Belgium,1486.0,14.0,31.0,,20210119,289939,,1066.0,,
3,,Guangdong,China,1370.0,8.0,1313.0,,20210119,289939,,1066.0,,
4,,,Sweden,1279.0,10.0,1.0,,20210119,289939,,1066.0,,


In [26]:
dim_region_1 = enigma_jhud[['fips', 'province_state', 'country_region', 'latitude', 'longitude']]
dim_region_2 = nytimes_data_in_usa_us_county[['fips', 'county', 'state']]
dimRegion = pd.merge(dim_region_1, dim_region_2, on = 'fips', how = 'inner')

In [27]:
dimRegion.head()

Unnamed: 0,fips,province_state,country_region,latitude,longitude,county,state
0,,,Austria,47.516,14.55,New York City,New York
1,,,Austria,47.516,14.55,Unknown,Rhode Island
2,,,Austria,47.516,14.55,New York City,New York
3,,,Austria,47.516,14.55,Unknown,Rhode Island
4,,,Austria,47.516,14.55,New York City,New York


In [58]:
dimRegion = dimRegion[:100000]

In [59]:
dimRegion.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 100000 entries, 0 to 99999
Data columns (total 7 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   fips            0 non-null       float64
 1   province_state  41130 non-null   object 
 2   country_region  100000 non-null  object 
 3   latitude        100000 non-null  float64
 4   longitude       100000 non-null  float64
 5   county          100000 non-null  object 
 6   state           100000 non-null  object 
dtypes: float64(3), object(4)
memory usage: 6.1+ MB


In [82]:
dimHospital = rearc_usa_hospital_beds[['fips','state','address','hq_city','hq_state','hospital_name','hospital_type']]

In [83]:
dimHospital.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 830 entries, 0 to 829
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   fips           830 non-null    object
 1   state          830 non-null    object
 2   address        830 non-null    object
 3   hq_city        830 non-null    object
 4   hq_state       830 non-null    object
 5   hospital_name  830 non-null    object
 6   hospital_type  830 non-null    object
dtypes: object(7)
memory usage: 45.5+ KB


In [84]:
dimHospital['fips'] = dimHospital['fips'].str.replace('\W', '')
dimHospital['fips'] = dimHospital['fips'].astype('float64')
dimHospital.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 830 entries, 0 to 829
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   fips           830 non-null    float64
 1   state          830 non-null    object 
 2   address        830 non-null    object 
 3   hq_city        830 non-null    object 
 4   hq_state       830 non-null    object 
 5   hospital_name  830 non-null    object 
 6   hospital_type  830 non-null    object 
dtypes: float64(1), object(6)
memory usage: 45.5+ KB


  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [32]:
dimDate = testing_data_states_daily[['fips', 'date']]

In [34]:
dimDate['date'] = pd.to_datetime(dimDate['date'], format='%Y%m%d')
dimDate.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0,fips,date
0,2.0,2021-03-07
1,1.0,2021-03-07
2,5.0,2021-03-07
3,60.0,2021-03-07
4,4.0,2021-03-07


In [35]:
dimDate['year'] = dimDate['date'].dt.year
dimDate['month'] = dimDate['date'].dt.month
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until


In [36]:
dimDate.head()

Unnamed: 0,fips,date,year,month,day_of_week
0,2.0,2021-03-07,2021,3,6
1,1.0,2021-03-07,2021,3,6
2,5.0,2021-03-07,2021,3,6
3,60.0,2021-03-07,2021,3,6
4,4.0,2021-03-07,2021,3,6


In [47]:
bucket = 'berlin-bucket'

In [48]:
csv_buffer = StringIO()
csv_buffer

<_io.StringIO at 0x13cce288>

In [50]:
# Loading factCovid into s3
factCovid.to_csv(csv_buffer)
s3 = boto3.resource('s3',
                    aws_access_key_id = AWS_ACCESS_KEY,
                    aws_secret_access_key = AWS_SECRET_KEY,
                    region_name = AWS_REGION
                    )
s3.Object(bucket, 'output/factCovid.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'E550YHZH4WS6YFWH',
  'HostId': 'gxNb6nxalG54tXBpWd9EffLK03TzwbuZ9BcYqOHo30vV0h9qS0upa43ZeN/S2fCwMtYhXz1ak/M=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'gxNb6nxalG54tXBpWd9EffLK03TzwbuZ9BcYqOHo30vV0h9qS0upa43ZeN/S2fCwMtYhXz1ak/M=',
   'x-amz-request-id': 'E550YHZH4WS6YFWH',
   'date': 'Mon, 14 Nov 2022 20:23:55 GMT',
   'etag': '"4436568d766fc82a5da3fa3f22d83956"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"4436568d766fc82a5da3fa3f22d83956"'}

In [55]:
# Loading dimDate into s3
csv_buffer = StringIO()
dimDate.to_csv(csv_buffer)
s3.Object(bucket, 'output/dimDate.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '18XPTNWP24CDM865',
  'HostId': 'JpOtF6Pmu+jiVuS3ltWN41ZUlADa4Weru805UIGIkEkVf0nDPAS51oxw5iefJ/I5EgmQtjDSbig=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'JpOtF6Pmu+jiVuS3ltWN41ZUlADa4Weru805UIGIkEkVf0nDPAS51oxw5iefJ/I5EgmQtjDSbig=',
   'x-amz-request-id': '18XPTNWP24CDM865',
   'date': 'Mon, 14 Nov 2022 20:39:59 GMT',
   'etag': '"19eb0b77e7f7441c686829bc3fd1a906"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"19eb0b77e7f7441c686829bc3fd1a906"'}

In [85]:
# Loading dimHospital into s3
csv_buffer = StringIO()
dimHospital.to_csv(csv_buffer)
s3.Object(bucket, 'output/dimHospital.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'PZ3RCSHKV2WCG9R4',
  'HostId': 'Mt+jLicg4o7zWoxfb+rqLYNf4TMhRurXkXXk8e9oTIY3EXnnzjKP02fxP+z0XeK+EYs0Fb8urrE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Mt+jLicg4o7zWoxfb+rqLYNf4TMhRurXkXXk8e9oTIY3EXnnzjKP02fxP+z0XeK+EYs0Fb8urrE=',
   'x-amz-request-id': 'PZ3RCSHKV2WCG9R4',
   'date': 'Mon, 14 Nov 2022 22:27:36 GMT',
   'etag': '"b8fa4dc5615b81af666a880e43b11cc1"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"b8fa4dc5615b81af666a880e43b11cc1"'}

In [60]:
# Loading dimRegion into s3
csv_buffer = StringIO()
dimRegion.to_csv(csv_buffer)
s3.Object(bucket, 'output/dimRegion.csv').put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '6TDJX7M69ZV536CE',
  'HostId': 'ifj0MELzHipowGQNivCYz0c7QndfrwNtTdXMqZqsVYAVIupp3qEIi3jhwUGloyuM50eTvPi2C1c=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ifj0MELzHipowGQNivCYz0c7QndfrwNtTdXMqZqsVYAVIupp3qEIi3jhwUGloyuM50eTvPi2C1c=',
   'x-amz-request-id': '6TDJX7M69ZV536CE',
   'date': 'Mon, 14 Nov 2022 21:51:37 GMT',
   'etag': '"765b732bd0d72b46edba3adee409abf1"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"765b732bd0d72b46edba3adee409abf1"'}

In [61]:
# Inferring schema for factCovid table
factCovidSchema = pd.io.sql.get_schema(factCovid.reset_index(), 'factCovid')
print(''.join(factCovidSchema))

CREATE TABLE "factCovid" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "confirmed" REAL,
  "deaths" REAL,
  "recovered" REAL,
  "active" REAL,
  "date" INTEGER,
  "positive" INTEGER,
  "negative" REAL,
  "hospitalizedcurrently" REAL,
  "hospitalized" REAL,
  "hospitalizeddischarged" REAL
)


In [65]:
# Inferring schema for dimDate table
dimDateSchema = pd.io.sql.get_schema(dimDate.reset_index(), 'dimDate')
print(''.join(dimDateSchema))

CREATE TABLE "dimDate" (
"index" INTEGER,
  "fips" REAL,
  "date" TIMESTAMP,
  "year" INTEGER,
  "month" INTEGER,
  "day_of_week" INTEGER
)


In [86]:
# # Inferring schema for dimHospital table
dimHospitalSchema = pd.io.sql.get_schema(dimHospital.reset_index(), 'dimHospital')
print(''.join(dimHospitalSchema))

CREATE TABLE "dimHospital" (
"index" INTEGER,
  "fips" REAL,
  "state" TEXT,
  "address" TEXT,
  "hq_city" TEXT,
  "hq_state" TEXT,
  "hospital_name" TEXT,
  "hospital_type" TEXT
)


In [67]:
# # Inferring schema for dimRegion table
dimRegionSchema = pd.io.sql.get_schema(dimRegion.reset_index(), 'dimRegion')
print(''.join(dimRegionSchema))

CREATE TABLE "dimRegion" (
"index" INTEGER,
  "fips" REAL,
  "province_state" TEXT,
  "country_region" TEXT,
  "latitude" REAL,
  "longitude" REAL,
  "county" TEXT,
  "state" TEXT
)


In [24]:
# This script has also been deployed to Glue Job on AWS to create and populate tables in Redshift with data from S3.
import redshift_connector

conn = redshift_connector.connect(
                        host='redshift-cluster-1.ct4dksrjudtf.us-east-1.redshift.amazonaws.com',
                        database='dev',
                        user='myadmin',
                        password='.....',
                        port=5439
                        )
conn.autocommit=True
cur = conn.cursor()

# Creating tables in Redshift
cur.execute("""CREATE TABLE "factCovid"(
                    "index" INTEGER,
                    "fips" REAL,
                    "province_state" TEXT,
                    "country_region" TEXT,
                    "confirmed" REAL,
                    "deaths" REAL,
                    "recovered" REAL,
                    "active" REAL,
                    "date" INTEGER,
                    "positive" INTEGER,
                    "negative" REAL,
                    "hospitalizedcurrently" REAL,
                    "hospitalized" REAL,
                    "hospitalizeddischarged" REAL
                    ) 
            """)

cur.execute("""CREATE TABLE "dimDate"(
                    "index" INTEGER,
                    "fips" REAL,
                    "date" TIMESTAMP,
                    "year" INTEGER,
                    "month" INTEGER,
                    "day_of_week" INTEGER
                    )
            """)

cur.execute("""CREATE TABLE "dimHospital"(
                   "index" INTEGER,
                   "fips" REAL,
                   "state" TEXT,
                   "address" TEXT,
                   "hq_city" TEXT,
                   "hq_state" TEXT,
                   "hospital_name" TEXT,
                   "hospital_type" TEXT
                    )
            """)

cur.execute("""CREATE TABLE "dimRegion" (
                   "index" INTEGER,
                   "fips" REAL,
                   "province_state" TEXT,
                   "country_region" TEXT,
                   "latitude" REAL,
                   "longitude" REAL,
                   "county" TEXT,
                   "state" TEXT
                    )
            """)

# Copying data from S3 to Redshift using COPY command
cur.execute("""
        COPY factCovid FROM 's3://berlin-bucket/output/factCovid.csv'
        CREDENTIALS 'aws_iam_role=arn:aws:iam::781706432582:role/redshift_s3_access'
        DELIMITER ','
        REGION 'us-east-1'
        IGNOREHEADER 1
        """)

cur.execute("""
        COPY dimDate FROM 's3://berlin-bucket/output/dimDate.csv'
        CREDENTIALS 'aws_iam_role=arn:aws:iam::781706432582:role/redshift_s3_access'
        DELIMITER ','
        REGION 'us-east-1'
        IGNOREHEADER 1
        """)

cur.execute("""
        COPY dimHospital FROM 's3://berlin-bucket/output/dimHospital.csv'
        CREDENTIALS 'aws_iam_role=arn:aws:iam::781706432582:role/redshift_s3_access'
        DELIMITER ','
        REGION 'us-east-1'
        IGNOREHEADER 1
        """)

cur.execute("""
        COPY dimRegion FROM 's3://berlin-bucket/output/dimRegion.csv'
        CREDENTIALS 'aws_iam_role=arn:aws:iam::781706432582:role/redshift_s3_access'
        DELIMITER ','
        REGION 'us-east-1'
        IGNOREHEADER 1
        """)

<redshift_connector.cursor.Cursor at 0xb97ba48>