# Capstone project - Analytics in agriculture

### In this file, we can find the ETL process that our project follows to go from the raw data located in 'data/' to the curated data stored in our rdbms. For this first version the rdbms will be PostgreSQL

In [32]:
import psycopg2
from psycopg2 import extras
import pandas as pd
import time
import configparser
import json
import boto3
from botocore.exceptions import ClientError
import numpy as np

# 1. Extraction

### We are not starting from the very first stage. The extraction phase begins when downloading the data from the database, but since this first step needs to be done yearly due to de refresh schedule that this data is following, we did a manual step before the one described below (Manual step: download files > uncompress files)

### After the short explanation, we proceed with the extraction of the data. The data that our source provides are csv files. Since, the data is completely untouched, we will need to select the files/tables that are useful for our project and rearrange the structure of the columns because as we will see during the etl, the structure given is optimized for storage but not for a more advanced data model.

In [33]:
# Load original tabular data
crops_data = pd.read_csv("data/Production_Crops_E_All_Data.csv", encoding="ANSI")
trade_data = pd.read_csv("data/Trade_Crops_Livestock_E_All_Data.csv", encoding="ANSI")

# Load flags used in main tables (crops_data & trade_data)
crops_flags = pd.read_csv("data/Production_Crops_E_Flags.csv", encoding="ANSI")
trade_flags = pd.read_csv("data/Trade_Crops_Livestock_E_Flags.csv", encoding="ANSI")

# Load redshift credentials
with open("credentials/redshift.json", 'r') as j:
    redshift = json.loads(j.read())

# Load aws credentials
with open("credentials/aws.json", 'r') as j:
    aws = json.loads(j.read())

# Load s3 credentials 
with open("credentials/s3.json", 'r') as j:
    s3 = json.loads(j.read())

# Create object for further use when connecting to s3 bucket
s3_client = boto3.client('s3')

# Delete temporary variable j
del j

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


# 2. Transformation

## Creation of dimension tables

In [34]:
#Create dimension table countries
dim_countries = crops_data[["Area Code", "Area"]].append(trade_data[["Area Code", "Area"]]).drop_duplicates()

#Create dimension table items
dim_items = crops_data[["Item Code", "Item"]].drop_duplicates()

# Create dimension table elements
dim_elements = crops_data[["Element Code", "Element"]].append(trade_data[["Element Code", "Element"]]).drop_duplicates()

#Create dimension table flags
dim_flags = crops_flags.append(trade_flags).drop_duplicates()

#Delete original flags dataframes
del crops_flags, trade_flags

## Clean dataframes

### Trade data has mixed crops and products data. to increase the performance of the next steps, first we will need to remove the rows that are not crops.

### Dimensions contain lots of duplciated data, therefore they will be trimmed as well

In [35]:
trade_data = trade_data[trade_data["Item Code"].isin(dim_items["Item Code"])]

## Modify Flags dataframe

### Flags table has "blank" primary key string associated to "Official data", but in the  fact table the value is blank. So it is needed to change the string "blank" to a blank string

In [36]:
dim_flags = dim_flags.replace("<blank>", "")

## rearrange the dataframe structures and creation of the fact table

### The design of this structure, will make the data grow horizontally, but for our SQL schema we can't keep a schema that is growing into this direction, so to rearrange the tables we have divided the data into 2 groups: keys and values. 
* keys: data that will be repeated after each iteration and serves as an identifier for the values
* values: data reported yearly and makes the dataframe grow each year 2 columns more

In [37]:
start = time.time()

#Separete keys from values, keys will be repeated when the data will be appended, values will be iterated to chain the data vertically (rows) instead of horizontally (columns)
raw_crop_keys = crops_data[["Area Code", "Item Code", "Element Code", "Unit"]]
raw_crop_values = crops_data.drop(labels = ["Area Code", "Area", "Item Code", "Item", "Element Code", "Element", "Unit"], axis = 1)

#Separete keys from values, keys will be repeated when the data will be appended, values will be iterated to chain the data vertically (rows) instead of horizontally (columns)
raw_trade_keys = trade_data[["Area Code", "Item Code", "Element Code", "Unit"]]
raw_trade_values = trade_data.drop(labels = ["Area Code", "Area", "Item Code", "Item", "Element Code", "Element", "Unit"], axis = 1)

# Simple data quality check. Values are always composed by (value, flag) pairs, if number of columns is odd, there is something wrong
if(len(raw_crop_values.columns) % 2 == 1):
    print(raw_crop_values.columns)
    raise Exception("Unexpected column found, columns number must be even as they consist of pairs. Please check out the dataframe structure")

# Simple data quality check. Values are always composed by (value, flag) pairs, if number of columns is odd, there is something wrong
if(len(raw_trade_values.columns) % 2 == 1):
    print(raw_trade_values.columns)
    raise Exception("Unexpected column found, columns number must be even as they consist of pairs. Please check out the dataframe structure")

# create empty dataframe for fact table
fact_crops = pd.DataFrame(columns = ["Area Code", "Item Code", "Element Code", "Unit", "Year", "Value", "Flag"])

# create empty dataframe for fact table
fact_trade = pd.DataFrame(columns = ["Area Code", "Item Code", "Element Code", "Unit", "Year", "Value", "Flag"])

# Iterate over each year of values to append the data to the fact table
for A, B in zip(*[iter(raw_crop_values)]*2):
    temp_aux_crops = raw_crop_keys
    temp_aux_crops['Year'] = A[1:]
    temp_aux_crops['Value'] = raw_crop_values[[A]]
    temp_aux_crops['Flag'] = raw_crop_values[[B]]
    print("evaluated from crops_data: ", A)
    fact_crops = fact_crops.append(temp_aux_crops)

# Iterate over each year of values to append the data to the fact table
for A, B in zip(*[iter(raw_trade_values)]*2):
    temp_aux_trade = raw_trade_keys
    temp_aux_trade['Year'] = A[1:]
    temp_aux_trade['Value'] = raw_trade_values[[A]]
    temp_aux_trade['Flag'] = raw_trade_values[[B]]
    print("evaluated from trade_data: ", A)
    fact_trade = fact_trade.append(temp_aux_trade)

end = time.time()

# Delete temporary variables
del A, B, raw_crop_keys, raw_crop_values, raw_trade_keys, raw_trade_values, temp_aux_crops, temp_aux_trade

# Delete original dataframes
del crops_data, trade_data

print("elapsed time: ", end - start)

# Delete chrono temporary variables
del start, end

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  temp_aux_crops['Year'] = A[1:]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  temp_aux_crops['Value'] = raw_crop_values[[A]]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  temp_aux_crops['Flag'] = raw_crop_values[[B]]
evaluated from crops_data:  Y1961
evaluated from crops_data:  Y1962
evaluated fro

## Combine the 2 main (fact) tables into one, appending each other.

### As backup and for logging purposes, we will also store the 2 fact tables.

In [57]:
# Append both fact tables into a common one
fact_fao = fact_crops.append(fact_trade)

# Store crops fact table as backup
fact_crops.to_csv("D:/Documents/GitHub/udacity-data-engineering-nanodegree/Capstone project - Analytics in agriculture/data/fact_crops.csv", index=False, header=False)

# Store trade fact table as backup
fact_trade.to_csv("D:/Documents/GitHub/udacity-data-engineering-nanodegree/Capstone project - Analytics in agriculture/data/fact_trade.csv", index=False, header=False)

# Store main fact table for further use
fact_fao.to_csv("D:/Documents/GitHub/udacity-data-engineering-nanodegree/Capstone project - Analytics in agriculture/data/fact_fao.csv", index=False, header=False)

# 3. Load

## Load the tables into our redshift cluster

In [39]:
# Connect to the redshift cluster
conn = psycopg2.connect(f"host={redshift['endpoint']} dbname={redshift['database']} user={redshift['username']} password={redshift['password']} port={redshift['port']}")
conn.autocommit = True
cur = conn.cursor()

## Drop tables if needed

In [None]:
def drop_table(table):
    '''
    Drops the table on the Redshift database

    Parameters:
        - table: Name of the table to delete
    '''

    try: 
        cur.execute(f"DROP TABLE {table};")
    except psycopg2.errors.ProgrammingError as e:
        print(e)

In [59]:
# Only execute these functions if the data is wrong in the database and you need to run again the ETL or you want to repopulate the tables

drop_table("countries")
drop_table("elements")
drop_table("elements")
drop_table("items")
drop_table("fact_fao")


Table "countries" does not exist

Table "elements" does not exist

Table "flags" does not exist

Table "items" does not exist

Table "fact_fao" does not exist

Table "fact_crops" does not exist

Table "fact_trade" does not exist



## Create tables if needed

In [60]:
#Create if not exsits fact table
cur.execute("""
    CREATE TABLE IF NOT EXISTS fact_fao(
        Area_code int,
        Item_Code int,
        Element_Code int,
        Unit varchar(50),
        Year int,
        Value float,
        Flag varchar(25)
    );
""")

# Create Countries dimension table if not exists
cur.execute("""
    CREATE TABLE IF NOT EXISTS countries(
        Area_Code int not null UNIQUE PRIMARY KEY,
        Area varchar(100)
    );
""")

# Create Elements dimension table if not exists
cur.execute("""
    CREATE TABLE IF NOT EXISTS elements(
        Element_Code int not null UNIQUE PRIMARY KEY,
        Element varchar(100)
    );
""")

# Create Flags dimension table if not exists
cur.execute("""
    CREATE TABLE IF NOT EXISTS flags(
        Flag varchar(25) UNIQUE PRIMARY KEY,
        Description varchar(100)
    );
""")

# Create Items dimension table if not exists
cur.execute("""
    CREATE TABLE IF NOT EXISTS items(
        Item_Code int not null UNIQUE PRIMARY KEY,
        Item varchar(100)
    );
""")

## Load data into dimension tables

In [61]:
# Load dimension tables
start = time.time()

try:
    psycopg2.extras.execute_values(cur, "INSERT INTO countries VALUES %s;", dim_countries.itertuples(index=False))
    
    psycopg2.extras.execute_values(cur, "INSERT INTO flags VALUES %s", dim_flags.itertuples(index=False))

    psycopg2.extras.execute_values(cur, "INSERT INTO elements VALUES %s", dim_elements.itertuples(index=False))

    psycopg2.extras.execute_values(cur, "INSERT INTO items VALUES %s", dim_items.itertuples(index=False))

except: 
    conn.rollback()

end = time.time()

print("Dimension tables lodaded!")
print("elapsed time: ", end - start)

# Delete chrono temporary variables
del start, end

Dimension tables lodaded!
elapsed time:  0.7024111747741699


## Upload fact table to S3 bucket

### `COPY` command from Redshift is only possible if fetching data from S3 bucket. So we proceed copying the file into S3

In [62]:
# Load fact_fao table
start = time.time()

try:
    response = s3_client.upload_file("D:/Documents/GitHub/udacity-data-engineering-nanodegree/Capstone project - Analytics in agriculture/data/fact_fao.csv", "arn:aws:s3:eu-west-2:412813684759:accesspoint/awsaccess", "fact_fao.csv")
except ClientError as e:
    logging.error(e)

end = time.time()

print("elapsed time: ", end - start)

elapsed time:  33.124932289123535


## Load data into fact table

In [65]:
start = time.time()

try:
    cur.execute(f"""
        COPY fact_fao
        FROM '{s3["base-connection"]}{s3["fact_fao"]}'
        ACCESS_KEY_ID '{aws["ACCESS_KEY_ID"]}'
        SECRET_ACCESS_KEY '{aws["SECRET_ACCESS_KEY"]}'
        DELIMITER ','
    """)
except ClientError as e:
    logging.error(e)

end = time.time()

print("elapsed time: ", end - start)

elapsed time:  13.558779954910278


In [71]:
# Data quality check to ensure the data is correclty populated
cur.execute("SELECT COUNT(1) FROM fact_fao")
print("nº of rows:", cur.fetchone()[0])

nº of rows: 9732699


In [2]:
# Creation of the data dictionary
data_dictionary = {
    "table_countries": {
        "table_type": "Dimension",
        "column_Area_Code": {
            "data type": "int",
            "description": "numerical identifier of country",
            "example": "132"
        },
        "column_Area": {
            "data type": "varchar(100)",
            "description": "name of the country",
            "example": "Spain"
        }
    },
    "table_items": {
        "table_type": "Dimension",
        "column_Item_Code": {
            "data type": "int",
            "description": "numerical identifier of item",
            "example": "56"
        },
        "column_Item": {
            "data type": "varchar(100)",
            "description": "item name, for our use case, an item is a crop",
            "example": "Wheat"
        }
    },
    "table_elements": {
        "table_type": "Dimension",
        "column_Element_Code": {
            "data type": "int",
            "description": "numerical identifier of element",
            "example": "5312"
        },
        "column_Element": {
            "data type": "varchar(100)",
            "description": "element name, in our use case, an element corresponds to the type of data",
            "example": "Area harvested"
        }
    },
    "table_flags": {
        "table_type": "Dimension",
        "column_Flag": {
            "data type": "varchar(25)",
            "description": "abreviation or representative string of flag as identifier",
            "example": "A"
        },
        "column_Description": {
            "data type": "varchar(100)",
            "description": "full description of flag",
            "example": "Aggregated data"
        }
    },
    "table_fact_fao": {
        "table_type": "Fact",
        "column_Area_Code": {
            "data type": "int",
            "description": "numerical identifier of country",
            "example": "132"
        },"column_Item_Code": {
            "data type": "int",
            "description": "numerical identifier of item",
            "example": "56"
        },"column_Element_Code": {
            "data type": "int",
            "description": "numerical identifier of element",
            "example": "5312"
        },
        "column_Unit": {
            "data type": "varchar(50)",
            "description": "measurement unit of value",
            "example": "tonnes"
        },
        "column_Year": {
            "data type": "int",
            "description": "year of the data reported",
            "example": "1961"
        },
        "column_Value": {
            "data type": "float",
            "description": "value to evaluate",
            "example": "4523.43"
        },
        "column_Flag": {
            "data type": "varchar(25)",
            "description": "abreviation or representative string of flag as identifier",
            "example": "A"
        },
    }
}

# Done!