## Product Import Data ETL

## Load JSON with countries and CSV with codes

In [4]:
import pandas as pd

df_countries = pd.read_json('country_data.json')
df_codes = pd.read_csv('hs_codes_67d0ca02-86f0-4829-8568-389ab67a38e5.csv')
df_countries.head()
df_codes.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
0,1654555,1,0,,,,This classification has been uploaded in RAMON...,This classification has been uploaded in RAMON...
1,1654556,1,10011000090,,I,,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS; ANIMAL PRODUCTS
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
3,1654558,3,10100000080,10021000000.0,1.01,1,"Live horses, asses, mules and hinnies","Live horses, asses, mules and hinnies"
4,1654559,4,10121000010,10100000000.0,,1.01,- Horses,


In [5]:
df_codes[df_codes['Level']==2].loc[:,'Description_complex']

2                                CHAPTER 1 - LIVE ANIMALS
52                 CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL
140     CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...
416     CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...
463     CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...
                              ...                        
7238    CHAPTER 93 - ARMS AND AMMUNITION; PARTS AND AC...
7264    CHAPTER 94 - FURNITURE; BEDDING, MATTRESSES, M...
7319    CHAPTER 95 - TOYS, GAMES AND SPORTS REQUISITES...
7362     CHAPTER 96 - MISCELLANEOUS MANUFACTURED ARTICLES
7428    CHAPTER 97 - WORKS OF ART, COLLECTORS' PIECES ...
Name: Description_complex, Length: 96, dtype: object

In [6]:
df_parents = df_codes[df_codes['Level']==2].copy()
df_parents.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,10011000000.0,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,10011000000.0,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,10011000000.0,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,10011000000.0,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."


#### DataTransformation

##### Data Cleansing

In [7]:
#pd.set_option('display.max_rows', None)
df_codes = df_codes[df_codes['Code_comm'].notnull()]
df_codes
df_parents

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,1.001100e+10,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,1.001100e+10,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,1.001100e+10,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,1.001100e+10,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,1.001100e+10,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."
...,...,...,...,...,...,...,...,...
7238,1661793,2,930021000090,9.300110e+11,93,XIX,CHAPTER 93 - ARMS AND AMMUNITION; PARTS AND AC...,ARMS AND AMMUNITION; PARTS AND ACCESSORIES THE...
7264,1661819,2,940021000090,9.400110e+11,94,XX,"CHAPTER 94 - FURNITURE; BEDDING, MATTRESSES, M...","FURNITURE; BEDDING, MATTRESSES, MATTRESS SUPPO..."
7319,1661874,2,950021000090,9.400110e+11,95,XX,"CHAPTER 95 - TOYS, GAMES AND SPORTS REQUISITES...","TOYS, GAMES AND SPORTS REQUISITES; PARTS AND A..."
7362,1661917,2,960021000090,9.400110e+11,96,XX,CHAPTER 96 - MISCELLANEOUS MANUFACTURED ARTICLES,MISCELLANEOUS MANUFACTURED ARTICLES


In [8]:
def clean_code(text):
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm'] == parent_code]['Description'].values[0]
    except:
        parent = None
    return (code, parent)

In [9]:
# Got tuple with clean codes (5-6 digits) and parent description
df_codes[['clean_code', 'parent_description']] = df_codes.apply(lambda x: clean_code(x['Code']), axis=1, result_type='expand')
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code', 'Description', 'parent_description']]
df_codes['id_code'] = df_codes.index
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description,id_code
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS,1
2,10021,LIVE ANIMALS,LIVE ANIMALS,2
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS,3
5,10121,Pure-bred breeding horses,LIVE ANIMALS,5
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS,6


#### Cleaning Countries DataFrame

In [10]:
df_countries = df_countries[['alpha-3', 'country', 'region', 'sub-region']]
df_countries = df_countries[df_countries['alpha-3'].notnull()]
df_countries['id_country'] = df_countries.index + 1
df_countries

Unnamed: 0,alpha-3,country,region,sub-region,id_country
0,AFG,Afghanistan,Asia,Southern Asia,1
1,ALB,Albania,Europe,Southern Europe,2
2,DZA,Algeria,Africa,Northern Africa,3
3,AND,Andorra,Europe,Southern Europe,4
4,AGO,Angola,Africa,Sub-Saharan Africa,5
...,...,...,...,...,...
268,UMI,United States Minor Outlying Islands,Oceania,Micronesia,269
269,VGB,Virgin Islands (British),Americas,Latin America and the Caribbean,270
270,VIR,Virgin Islands (U.S.),Americas,Latin America and the Caribbean,271
271,WLF,Wallis and Futuna,Oceania,Polynesia,272


### Create dataframe from csv

In [12]:
import pandas as pd
trades_df = pd.read_csv('trades.csv', dtype={'country_code': 'category', 'comm_code': 'uint32', 'year': 'uint32', 'flow': 'category', 'trade_usd': 'float32', 'kg': 'float32', 'quantity': 'float32', 'quantity_name': 'category'})

#### Merge df_countries with "trades" Postgres table

In [13]:
# merge
df_trades_clean = trades_df.merge(df_codes[['clean_code', 'id_code']], how='left', left_on='comm_code', right_on='clean_code')

In [14]:
df_trades_clean_short = df_trades_clean.loc[::12,:].copy()

In [15]:
df_trades_clean = df_trades_clean_short.merge(df_countries[['alpha-3', 'id_country']], how='left', left_on='country_code', right_on='alpha-3')
df_trades_clean.tail()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,clean_code,id_code,alpha-3,id_country
518025,AUS,2008,910990,Import,92284.0,2837.0,33051.0,Number of items,910990,7188,AUS,9
518026,AUS,2008,911180,Export,2700.0,967.0,4052.0,Number of items,911180,7198,AUS,9
518027,AUS,2008,911390,Export,131528.0,4250.0,4250.0,Weight in kilograms,911390,7206,AUS,9
518028,AUS,2007,910119,Export,1002215.0,1611.0,6552.0,Number of items,910119,7143,AUS,9
518029,AUS,2007,910211,Import,134055696.0,990136.0,4988213.0,Number of items,910211,7152,AUS,9


### Transactional Data Transformation: flow (import-export)

#### Clean Trades

In [16]:
def create_dimension(data, id_name):
    keys_list = []
    value = 1
    for _ in data:
        keys_list.append(value)
        value = value + 1
    return pd.DataFrame({id_name: keys_list, 'values': data})

In [17]:
quantity_df = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')
flow_df = create_dimension(df_trades_clean['flow'].unique(), 'id_flow')
year_df = create_dimension(df_trades_clean['year'].unique(), 'id_year')

In [18]:
quantity_df
flow_df

Unnamed: 0,id_flow,values
0,1,Re-Import
1,2,Import
2,3,Export
3,4,Re-Export


In [19]:
df_trades_clean.count()

country_code     518030
year             518030
comm_code        518030
flow             518030
trade_usd        518030
kg               511548
quantity         500430
quantity_name    518030
clean_code       518030
id_code          518030
alpha-3          518030
id_country       518030
dtype: int64

In [20]:
#trades_df_clean = df_trades_clean.loc[::12,:].copy()
trades_df_clean = df_trades_clean.copy()

In [21]:
trades_df_clean = trades_df_clean.merge(quantity_df, how='left', left_on='quantity_name', right_on='values')
trades_df_clean = trades_df_clean.merge(flow_df, how='left', left_on='flow', right_on='values')
trades_df_clean = trades_df_clean.merge(year_df, how='left', left_on='year', right_on='values')

In [22]:
trades_df_clean['id_trades'] = trades_df_clean.index + 1

In [23]:
trades_df_clean.count()

country_code     518030
year             518030
comm_code        518030
flow             518030
trade_usd        518030
kg               511548
quantity         500430
quantity_name    518030
clean_code       518030
id_code          518030
alpha-3          518030
id_country       518030
id_quantity      518030
values_x         518030
id_flow          518030
values_y         518030
id_year          518030
values           518030
id_trades        518030
dtype: int64

#### Fact Table (trades)

In [24]:
trades_df_final = trades_df_clean[['id_trades', 'trade_usd', 'kg', 'quantity', 'id_code', 'id_country', 'id_quantity', 'id_flow', 'id_year']].copy()

In [25]:
trades_df_final.count()

id_trades      10793
trade_usd      10793
kg             10666
quantity       10450
id_code        10793
id_country     10793
id_quantity    10793
id_flow        10793
id_year        10793
dtype: int64

#### Dimensions: df_countries and df_codes

In [26]:
df_countries = df_countries[['id_country', 'alpha-3', 'country', 'region', 'sub-region']].copy()

In [27]:
df_codes = df_codes[['id_code', 'clean_code', 'Description', 'parent_description']].copy()

### Load

In [95]:
trades_df_final.to_csv('target/trades.csv', index=False, sep='|')
df_countries.to_csv('target/countries.csv', index=False, sep='|')
df_codes.to_csv('target/codes.csv', index=False, sep='|')
quantity_df.to_csv('target/quantity.csv', index=False, sep='|')
flow_df.to_csv('target/flow.csv', index=False, sep='|')
year_df.to_csv('target/years.csv', index=False, sep='|')

In [122]:
import os
import boto3
import redshift_connector

client = boto3.client(
    's3',
    aws_access_key_id = os.environ.get('aws_access_key_id'),
    aws_secret_access_key = os.environ.get('aws_secret_access_key')
)

conn = redshift_connector.connect(
    host = os.environ.get('redshift_host'),
    database = os.environ.get('redshift_database'),
    port = 5439,
    user = os.environ.get('redshift_user'),
    password = os.environ.get('redshift_password')
)

cursor = conn.cursor()

In [111]:
conn.close()

In [109]:
# python function to load data into s3 bucket through COPY sentence of redshift
def load_file(file_name):
    table_name = file_name.split('.')[0]
    client.upload_file(
        Filename='target/{}'.format(file_name),
        Bucket = 'bucket-etl-redshift',
        Key='course_etl_target/{}'.format(file_name)
    )
# COPY sentence of redshift
    sentence = '''copy public.{} from 's3://bucket-etl-redshift/course_etl_target/{}' credentials 'aws_access_key_id={};aws_secret_access_key={}' csv delimiter '|' region 'us-west-2' ignoreheader 1'''.format(table_name, file_name, os.environ.get('aws_access_key_id'), os.environ.get('aws_secret_access_key'))
    try:
        cursor.execute(sentence)
        print('Ok en la tabla ' + table_name)
    except:
        print('Error en la tabla ' + table_name)

In [123]:
# Load data into the previously created tables with the sql script
load_file('trades.csv')
load_file('countries.csv')
load_file('codes.csv')
load_file('quantity.csv')
load_file('flow.csv')
load_file('years.csv')

Ok en la tabla years


In [125]:
# Execute changes
conn.commit()

In [126]:
# Close the connection
conn.close()