# Ingestion to Redshift

Technical problem : it seems that you can't upload from UI interface the .db file to Amazon Redshift.

Solution : We can transform .db file to .parquet and ingest the data to S3 bucket then Redshift   

## Upload raw file to S3

In [11]:
# install the boto3 library for AWS
!pip install boto3

Collecting boto3
  Downloading boto3-1.35.47-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
Collecting s3transfer<0.11.0,>=0.10.0
  Downloading s3transfer-0.10.3-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.6/82.6 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.36.0,>=1.35.47
  Downloading botocore-1.35.47-py3-none-any.whl (12.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.6/12.6 MB[0m [31m51.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: botocore, s3transfer, boto3
Successfully installed boto3-1.35.47 botocore-1.35.47 s3transfer-0.10.3


In [12]:
# library for parsing .env files and protecting sensitive information
# don't forget to create a .env file with your AWS credentials and reference it in the .gitignore if the repository is public
!pip install python-dotenv



In [1]:
import os
from dotenv import load_dotenv
# Load environment variables from a .env file
load_dotenv()

# Retrieve environment variables
access_key = os.getenv("access_key_S3")
secret_key  = os.getenv("secret_key_S3")

In [2]:
from boto3.s3.transfer import S3Transfer
import boto3

# Upload the database to the S3 bucket
filepath = "calls_case_study.db"
bucket_name = "octopus-energy-ops"
folder_name = "raw_data"
filename = filepath


client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key)
transfer = S3Transfer(client)
transfer.upload_file(filepath, bucket_name, folder_name+"/"+filename)

## Load from S3 and prepare data to Redshift

How to import sqlite (.db) to Python
https://stackoverflow.com/questions/62340498/open-database-files-db-using-python

In [11]:
import os

# Create an S3 client
s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

# Specify the S3 bucket and file path
bucket_name = 'octopus-energy-ops'
file_path = 'raw_data/calls_case_study.db'

# create a new folder in the current directory
os.makedirs('raw_data', exist_ok=True)

s3_client.download_file(bucket_name, file_path, 'raw_data/calls_case_study.db')

In [12]:
import pandas as pd
import sqlite3

# Connect to the SQLite database
try:
    conn = sqlite3.connect("raw_data/calls_case_study.db")    
except Exception as e:
    print(e)

# Create a cursor to retrieve tables from the database
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(f"Table Name : {cursor.fetchall()}")

Table Name : [('call_reason',), ('account',), ('call',)]


In [14]:
# retrieve shape from the .db file

reason_columns = cursor.execute("PRAGMA table_info(call_reason);").fetchall()
reason_rows = cursor.execute("SELECT count(*) FROM call_reason;").fetchall()
print(f"Table Reason Shape : {len(reason_columns)}")
print(f'Number of Rows in Reason Table : {reason_rows}')
print(f"Table Reason Info : {reason_columns}")
print("***************")

account_columns = cursor.execute("PRAGMA table_info(account);").fetchall()
account_rows = cursor.execute("SELECT count(*) FROM account;").fetchall()
print(f"Table Account Shape : {len(reason_columns)}")
print('Number of Rows in Account Table : ', account_rows)
print(f"Table Account Info : {reason_columns}")
print("***************")

call_columns = cursor.execute("PRAGMA table_info(call);").fetchall()
call_rows = cursor.execute("SELECT count(*) FROM call;").fetchall()
print(f"Table Call Shape : {len(call_columns)}")
print('Number of Rows in Call Table : ', call_rows)
print(f"Table Call Info : {call_columns}")
print("***************")

Table Reason Shape : 3
Number of Rows in Reason Table : [(145,)]
Table Reason Info : [(0, 'id', 'INTEGER', 0, None, 0), (1, 'reason', 'TEXT', 0, None, 0), (2, 'category', 'TEXT', 0, None, 0)]
***************
Table Account Shape : 3
Number of Rows in Account Table :  [(75815,)]
Table Account Info : [(0, 'id', 'INTEGER', 0, None, 0), (1, 'reason', 'TEXT', 0, None, 0), (2, 'category', 'TEXT', 0, None, 0)]
***************
Table Call Shape : 7
Number of Rows in Call Table :  [(100000,)]
Table Call Info : [(0, 'id', 'TEXT', 0, None, 0), (1, 'called_at', 'TEXT', 0, None, 0), (2, 'agent_id', 'TEXT', 0, None, 0), (3, 'reason_id', 'TEXT', 0, None, 0), (4, 'talk_time', 'REAL', 0, None, 0), (5, 'direction', 'TEXT', 0, None, 0), (6, 'account_id', 'TEXT', 0, None, 0)]
***************


In [15]:
# Load the tables into pandas dataframes and confirm there is no missing data

df_callreason = pd.read_sql_query('SELECT * FROM call_reason', conn)
df_account = pd.read_sql_query('SELECT * FROM account', conn)
df_call = pd.read_sql_query('SELECT * FROM call', conn)

print("df_callreason")
print(df_callreason.info())
print("********************************")
print("df_account")
print(df_account.info())
print("********************************")
print("df_call")
print(df_call.info())

df_callreason
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 145 entries, 0 to 144
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   id        145 non-null    int64 
 1   reason    145 non-null    object
 2   category  145 non-null    object
dtypes: int64(1), object(2)
memory usage: 3.5+ KB
None
********************************
df_account
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 75815 entries, 0 to 75814
Data columns (total 3 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   id             75815 non-null  int64 
 1   sales_channel  75815 non-null  object
 2   sign_up_date   75815 non-null  object
dtypes: int64(1), object(2)
memory usage: 1.7+ MB
None
********************************
df_call
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 7 columns):
 #   Column      Non-Null Count   Dtype  
---  ------      ------

In [46]:
import os

# Create a directory to store the parquet files
os.makedirs('prep_data', exist_ok=True)

df_callreason.to_parquet('prep_data/call_reason.parquet')
df_account.to_parquet('prep_data/account.parquet')
df_call.to_parquet('prep_data/call.parquet')

In [50]:
from boto3.s3.transfer import S3Transfer
import boto3

for file in os.listdir('prep_data'):
    print(f'Current file processed : {file}')
    filepath = 'prep_data/' + file
    bucket_name = 'octopus-energy-ops'
    folder_name = 'prep_data'
    filename = file
    client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key)
    transfer = S3Transfer(client)
    transfer.upload_file(filepath, bucket_name, folder_name+"/"+filename)
    print(f'{file} uploaded to S3 in the folder {folder_name}')
    print("***************")

Current file processed : account.parquet
account.parquet uploaded to S3 in the folder prep_data
***************
Current file processed : call_reason.parquet
call_reason.parquet uploaded to S3 in the folder prep_data
***************
Current file processed : call.parquet
call.parquet uploaded to S3 in the folder prep_data
***************


## Upload to Redshift

In [41]:
# dictionnary to map pandas dtypes to redshift dtypes

dtype_to_redshift = {
    "int64": "BIGINT",
    "float64": "DOUBLE PRECISION",
    "bool": "BOOLEAN",
    "datetime64[ns]": "TIMESTAMP",
    "timedelta[ns]": "INTERVAL",
    "object": "VARCHAR(MAX)",
    "category": "VARCHAR(MAX)",
    "int32": "INTEGER",
    "float32": "REAL"
}

In [39]:
import boto3
from dotenv import load_dotenv

# Load environment variables from a .env file
load_dotenv()

# Retrieve environment variables
access_key = os.getenv("access_key_S3")
secret_key  = os.getenv("secret_key_S3")

# Create an S3 client
s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

# Specify the bucket name
bucket_name = 'octopus-energy-ops'

# List objects in the bucket
response = s3_client.list_objects_v2(Bucket=bucket_name)

folder_name = 'prep_data'

# List objects in the bucket to prepare COPY command in Redshift
aws_files = []
for obj in response['Contents']:
  if obj['Key'].startswith(folder_name):
    aws_uri = f"s3://{bucket_name}/{obj['Key']}"
    print(aws_uri)
    aws_files.append(aws_uri)

print(aws_files)

s3://octopus-energy-ops/prep_data/account.parquet
s3://octopus-energy-ops/prep_data/call.parquet
s3://octopus-energy-ops/prep_data/call_reason.parquet
['s3://octopus-energy-ops/prep_data/account.parquet', 's3://octopus-energy-ops/prep_data/call.parquet', 's3://octopus-energy-ops/prep_data/call_reason.parquet']


In [63]:
!pip install psycopg2

Collecting redshift_connector
  Downloading redshift_connector-2.1.3-py3-none-any.whl (129 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m130.0/130.0 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
Collecting scramp<1.5.0,>=1.2.0
  Downloading scramp-1.4.5-py3-none-any.whl (12 kB)
Collecting asn1crypto>=1.5.1
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.0/105.0 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: asn1crypto, scramp, redshift_connector
Successfully installed asn1crypto-1.5.1 redshift_connector-2.1.3 scramp-1.4.5


In [48]:
# create a mapping between parquet dtypes and redshift dtypes
list_types = []

# loop through the files in the prep_data folder
for file in os.listdir('prep_data'):
    table_dic =  {
        'table_name': file.split('.')[0],
        'columns': []
    }
    # read the parquet file with pandas
    df = pd.read_parquet(f'prep_data/{file}')
    for col in df.columns:
        table_dic['columns'].append({
            'name': col,
            # if we filter on a specific series, we can get the dtype with .dtype
            'type': dtype_to_redshift[str(df[col].dtype)]
        })
    list_types.append(table_dic)

print(list_types)

[{'table_name': 'account', 'columns': [{'name': 'id', 'type': 'BIGINT'}, {'name': 'sales_channel', 'type': 'VARCHAR(MAX)'}, {'name': 'sign_up_date', 'type': 'VARCHAR(MAX)'}]}, {'table_name': 'call_reason', 'columns': [{'name': 'id', 'type': 'BIGINT'}, {'name': 'reason', 'type': 'VARCHAR(MAX)'}, {'name': 'category', 'type': 'VARCHAR(MAX)'}]}, {'table_name': 'call', 'columns': [{'name': 'id', 'type': 'VARCHAR(MAX)'}, {'name': 'called_at', 'type': 'VARCHAR(MAX)'}, {'name': 'agent_id', 'type': 'VARCHAR(MAX)'}, {'name': 'reason_id', 'type': 'VARCHAR(MAX)'}, {'name': 'talk_time', 'type': 'DOUBLE PRECISION'}, {'name': 'direction', 'type': 'VARCHAR(MAX)'}, {'name': 'account_id', 'type': 'VARCHAR(MAX)'}]}]


In [57]:
import psycopg2
from dotenv import load_dotenv
import os

# Retrieve the Redshift credentials from the .env file
load_dotenv()
redshift_user = os.getenv("redshift_user")
redshift_password = os.getenv("redshift_password")
iam_role = os.getenv("iam_role")

# Connect to Redshift
conn = psycopg2.connect(
  host='octopus-energy-ops.202533530775.eu-west-3.redshift-serverless.amazonaws.com',
  port=5439,
  database='dev',
  user=redshift_user,
  password=redshift_password
)

# Create a cursor
cursor = conn.cursor()

# Create tables from the list_types
for table in list_types:
  table_name = table['table_name']
  columns = table['columns']
  # create a string with the columns and their types
  columns_str = ', '.join([f"{col['name']} {col['type']}" for col in columns])
  cursor.execute(f"CREATE TABLE IF NOT EXISTS dev.raw_data.{table_name} ({columns_str})")
  print(f"Table {table_name} created")
  conn.commit()

# Once tables schema are created, we can execute the COPY command for each file of the S3 bucket to populate the tables
for file in aws_files:
  table_name = file.split('/')[-1].split('.')[0]
  cursor.execute(f"COPY dev.raw_data.{table_name} FROM '{file}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET")
  print(f"Table {table_name} populated")
  conn.commit()

# # Close the cursor and connection
cursor.close()
conn.close()

{'table_name': 'account', 'columns': [{'name': 'id', 'type': 'BIGINT'}, {'name': 'sales_channel', 'type': 'VARCHAR(MAX)'}, {'name': 'sign_up_date', 'type': 'VARCHAR(MAX)'}]}
id BIGINT, sales_channel VARCHAR(MAX), sign_up_date VARCHAR(MAX)
Table account created
{'table_name': 'call_reason', 'columns': [{'name': 'id', 'type': 'BIGINT'}, {'name': 'reason', 'type': 'VARCHAR(MAX)'}, {'name': 'category', 'type': 'VARCHAR(MAX)'}]}
id BIGINT, reason VARCHAR(MAX), category VARCHAR(MAX)
Table call_reason created
{'table_name': 'call', 'columns': [{'name': 'id', 'type': 'VARCHAR(MAX)'}, {'name': 'called_at', 'type': 'VARCHAR(MAX)'}, {'name': 'agent_id', 'type': 'VARCHAR(MAX)'}, {'name': 'reason_id', 'type': 'VARCHAR(MAX)'}, {'name': 'talk_time', 'type': 'DOUBLE PRECISION'}, {'name': 'direction', 'type': 'VARCHAR(MAX)'}, {'name': 'account_id', 'type': 'VARCHAR(MAX)'}]}
id VARCHAR(MAX), called_at VARCHAR(MAX), agent_id VARCHAR(MAX), reason_id VARCHAR(MAX), talk_time DOUBLE PRECISION, direction VARC