In [1]:
# Importing libraries
import boto3
import pandas as pd
import time
from io import StringIO

In [2]:
# Connection parameters
AWS_ACCESS_KEY = "access_key"
AWS_SECRET_KEY = "secret_access_key"
AWS_REGION = "us-east-1"
SCHEMA_NAME = "agricultural_production_database"
S3_STAGING_DIR = "s3://agricultural-production/output/"
S3_BUCKET_NAME = "agricultural-production"
S3_OUTPUT_DIRECTORY = "output"

In [3]:
# Athena connection
athena_client = boto3.client(
    "athena",
    aws_access_key_id=AWS_SECRET_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

In [4]:
Dict = {}
def download_and_load_query_results(
        client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location)

### Cheese Data

In [5]:
# Cheese production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM cheese_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [6]:
response

{'QueryExecutionId': 'a85de3e5-8e01-48cc-abb8-2147e62e95d0',
 'ResponseMetadata': {'RequestId': 'dd567b23-6437-4095-a78a-5462d6a0fb70',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 19 Jul 2024 12:42:36 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '59',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'dd567b23-6437-4095-a78a-5462d6a0fb70'},
  'RetryAttempts': 0}}

In [7]:
# Cheese Production data
cheese_production = download_and_load_query_results(athena_client, response)
cheese_production.head()

Unnamed: 0,year,period,geo_level,state_ansi,commodity_id,domain,value
0,2023,APR,STATE,6.0,6,TOTAL,"""208"
1,2023,APR,STATE,16.0,6,TOTAL,"""86"
2,2023,APR,STATE,17.0,6,TOTAL,"""5"
3,2023,APR,STATE,19.0,6,TOTAL,"""31"
4,2023,APR,STATE,27.0,6,TOTAL,"""69"


### Coffee Data

In [8]:
# Coffee production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM coffee_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [9]:
# Coffee Production data
coffee_production = download_and_load_query_results(athena_client, response)
coffee_production.head()

Unnamed: 0,year,period,geo_level,state_ansi,commodity_id,value
0,2016,YEAR,STATE,15,1,"""5"
1,2015,YEAR,STATE,15,1,"""6"
2,2014,YEAR,STATE,15,1,"""7"
3,2013,YEAR,STATE,15,1,"""8"
4,2012,YEAR,STATE,15,1,"""7"


### Eggs Data

In [10]:
# Eggs production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM egg_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [11]:
# Eggs Production data
egg_production = download_and_load_query_results(athena_client, response)
egg_production.head()

Unnamed: 0,year,period,geo_level,state_ansi,commodity_id,domain,value
0,2023,APR,STATE,1.0,7,TOTAL,"""224"
1,2023,APR,STATE,5.0,7,TOTAL,"""319"
2,2023,APR,STATE,6.0,7,TOTAL,"""271"
3,2023,APR,STATE,8.0,7,TOTAL,"""52"
4,2023,APR,STATE,13.0,7,TOTAL,"""437"


### Honey Data

In [12]:
# Honey production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM honey_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [13]:
# Honey Production data
honey_production = download_and_load_query_results(athena_client, response)
honey_production.head()

Unnamed: 0,year,geo_level,state_ansi,commodity_id,value
0,2022,STATE,1.0,2,"""400"
1,2022,STATE,4.0,2,"""1"
2,2022,STATE,5.0,2,"""1"
3,2022,STATE,6.0,2,"""11"
4,2022,STATE,8.0,2,"""1"


### Milk Data

In [14]:
# Milk production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM milk_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [15]:
# Milk Production data
milk_production = download_and_load_query_results(athena_client, response)
milk_production.head()

Unnamed: 0,year,period,geo_level,state_ansi,commodity_id,domain,value
0,2023,APR,STATE,4.0,5,TOTAL,"""428"
1,2023,APR,STATE,6.0,5,TOTAL,"""3"
2,2023,APR,STATE,8.0,5,TOTAL,"""444"
3,2023,APR,STATE,12.0,5,TOTAL,"""166"
4,2023,APR,STATE,13.0,5,TOTAL,"""180"


### State Data

In [16]:
#  State table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM state_information",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [17]:
# States data
state_information = download_and_load_query_results(athena_client, response)
state_information.head()

Unnamed: 0,state,state_ansi
0,ALABAMA,1
1,ALASKA,2
2,ARIZONA,4
3,ARKANSAS,5
4,CALIFORNIA,6


### Yogurt Data

In [18]:
# Yogurt production table
athena_client = boto3.client(
    'athena',
    region_name='us-east-1',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

response = athena_client.start_query_execution(
    QueryString = "SELECT * FROM yogurt_production",
    QueryExecutionContext ={"Database":SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation":S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [19]:
# Yogurt Production data
yogurt_production = download_and_load_query_results(athena_client, response)
yogurt_production.head()

Unnamed: 0,year,period,geo_level,state_ansi,commodity_id,domain,value
0,2022,YEAR,STATE,6,4,TOTAL,"""377"
1,2022,YEAR,STATE,36,4,TOTAL,"""793"
2,2021,YEAR,STATE,6,4,TOTAL,"""374"
3,2021,YEAR,STATE,36,4,TOTAL,"""774"
4,2020,YEAR,STATE,6,4,TOTAL,"""354"


## ETL (Data Transformation)

In [20]:
# Loading all the files
cheese_df = cheese_production
milk_df = milk_production
coffee_df = coffee_production
honey_df = honey_production
egg_df = egg_production
yogurt_df = yogurt_production
states_df = state_information

### Creating Dimension Tables

In [21]:
# Create State Dimension Table
state_dim = states_df[['state', 'state_ansi']].drop_duplicates()


In [22]:
# Define the function to create the Time Dimension Table
def create_time_dim(df):
    # Get unique values for year and period
    unique_years = df['year'].unique()
    unique_periods = df['period'].unique()
    
    # Create a DataFrame for the time dimension
    time_dim = pd.DataFrame({
        'Year': pd.Series(unique_years).repeat(len(unique_periods)).reset_index(drop=True),
        'Period': unique_periods.tolist() * len(unique_years)
    }).drop_duplicates()
    
    return time_dim

# Assuming cheese_df has 'Year' and 'Period' columns
time_dim = create_time_dim(cheese_df)
print(time_dim)

     Year Period
0    2023    APR
1    2023    FEB
2    2023    JAN
3    2023    MAR
4    2023    AUG
..    ...    ...
697  1970    MAY
698  1970    NOV
699  1970    OCT
700  1970    SEP
701  1970   YEAR

[702 rows x 2 columns]


In [23]:
# Create Commodity Dimension Table
commodity_dim = pd.DataFrame({
    'Comodity_id': range(1, 7),  
    'Comodity_Name': ['Cheese', 'Milk', 'Coffee', 'Honey', 'Eggs', 'Yogurt']
})

## Creating Fact Tables

In [24]:
# Create Cheese Fact Table
cheese_fact = cheese_df[['year', 'period', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
cheese_fact = cheese_fact.rename(columns={'Value': 'Cheese_Value'})


In [25]:
# Create Milk Fact Table
milk_fact = milk_df[['year', 'period', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
milk_fact = milk_fact.rename(columns={'Value': 'Milk_Value'})


In [26]:
# Create Coffee Fact Table
coffee_fact = coffee_df[['year', 'period', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
coffee_fact = coffee_fact.rename(columns={'Value': 'Coffee_Value'})


In [27]:
# Create Honey Fact Table
honey_fact = honey_df[['year', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
honey_fact = honey_fact.rename(columns={'Value': 'Honey_Value'})


In [28]:
# Create Egg Fact Table
egg_fact = egg_df[['year', 'period', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
egg_fact = egg_fact.rename(columns={'Value': 'Egg_Value'})


In [29]:
# Create Yogurt Fact Table
yogurt_fact = yogurt_df[['year', 'period', 'geo_level', 'state_ansi', 'commodity_id', 'value']]
yogurt_fact = yogurt_fact.rename(columns={'Value': 'Yogurt_Value'})


In [30]:
# Add source column to each fact table
cheese_fact['Source'] = 'Cheese'
milk_fact['Source'] = 'Milk'
coffee_fact['Source'] = 'Coffee'
honey_fact['Source'] = 'Honey'
egg_fact['Source'] = 'Egg'
yogurt_fact['Source'] = 'Yogurt'

# Concatenate all fact tables into one
all_facts = pd.concat([cheese_fact, milk_fact, coffee_fact, honey_fact, egg_fact, yogurt_fact], ignore_index=True)


In [35]:
# Save dimension tables
state_dim.to_csv('state_dim.csv', index=False)
time_dim.to_csv('time_dim.csv', index=False)
commodity_dim.to_csv('commodity_dim.csv', index=False)

# Save combined fact table
all_facts.to_csv('all_facts.csv', index=False)


## Save transformed data into S3 Bucket

In [36]:
csv_buffer = StringIO
csv_buffer

_io.StringIO

In [38]:
# Connection parameters to push the all_facts table to S3 Bucket
ACCESS_KEY_ID = "access_key"
SECRET_ACCESS_KEY = "secret_access_key"
REGION = "us-east-1"

BUCKET_NAME = "agricultural-production"
FileName = 'cleandata/all_facts.csv'
csv_buffer = StringIO()
all_facts.to_csv(csv_buffer, index=False)

s3csv = boto3.client('s3',
                     region_name = REGION,
                     aws_access_key_id = ACCESS_KEY_ID,
                     aws_secret_access_key = SECRET_ACCESS_KEY
                     )
response = s3csv.put_object(Body=csv_buffer.getvalue(),
                            Bucket = BUCKET_NAME,
                            Key=FileName)

In [39]:
# Connection parameters to push the time_dim table to S3 Bucket
ACCESS_KEY_ID = "access_key"
SECRET_ACCESS_KEY = "secret_access_key"
REGION = "us-east-1"

BUCKET_NAME = "agricultural-production"
FileName = 'cleandata/time_dim.csv'
csv_buffer = StringIO()
time_dim.to_csv(csv_buffer, index=False)

s3csv = boto3.client('s3',
                     region_name = REGION,
                     aws_access_key_id = ACCESS_KEY_ID,
                     aws_secret_access_key = SECRET_ACCESS_KEY
                     )
response = s3csv.put_object(Body=csv_buffer.getvalue(),
                            Bucket = BUCKET_NAME,
                            Key=FileName)

## Glue Deployment

In [40]:
# Time dimension table
time_dimensionsql = pd.io.sql.get_schema(time_dim, 'time_dimension')
print(''.join(time_dimensionsql))

CREATE TABLE "time_dimension" (
"Year" INTEGER,
  "Period" TEXT
)


In [41]:
# All facts dimension table
all_factssql = pd.io.sql.get_schema(all_facts, 'all_facts')
print(''.join(all_factssql))

CREATE TABLE "all_facts" (
"year" INTEGER,
  "period" TEXT,
  "geo_level" TEXT,
  "state_ansi" REAL,
  "commodity_id" INTEGER,
  "value" TEXT,
  "Source" TEXT
)


## Install RedShift Dependancies

In [43]:
import redshift_connector

In [None]:
# agricultural_engineer
# Agricultural1

In [45]:
# Connect to RedShift
conn = redshift_connector.connect(
    host = 'redshift-cluster-1.us-east-1.redshift.amazonaws.com',
    database = 'dev',
    user ='agricultural_engineer',
    password='Agricultural1',
    timeout=60
)

In [46]:
conn.autocommit = True

In [47]:
cursor = redshift_connector.Cursor = conn.cursor()

In [48]:
cursor.execute("""
CREATE TABLE "all_facts" (
"year" INTEGER,
  "period" TEXT,
  "geo_level" TEXT,
  "state_ansi" REAL,
  "commodity_id" INTEGER,
  "value" TEXT,
  "Source" TEXT
)
""")

<redshift_connector.cursor.Cursor at 0x1b353f81e20>

In [49]:
cursor.execute("""
CREATE TABLE "time_dimension" (
"Year" INTEGER,
  "Period" TEXT
)               
""")

<redshift_connector.cursor.Cursor at 0x1b353f81e20>

In [51]:
cursor.execute("""
copy all_facts from 's3://agricultural-production/cleandata/all_facts.csv'
credentials 'aws_iam_role=arn:aws:iam::10021:role/agricultural-production-redshift-s3-role'
delimiter ','
region 'us-east-1'
IGNOREHEADER 1
""")

<redshift_connector.cursor.Cursor at 0x1b353f81e20>

s3://agricultural-production/cleandata/time_dim.csv