In [3]:
import pandas as pd
from sqlalchemy import create_engine
import os

In [4]:
engine = create_engine(f"postgresql+psycopg2://{os.environ["POSTGRES_USERNAME"]}:{os.environ["POSTGRES_PASSWORD"]}@localhost/postgres")

In [5]:
df_trades = pd.read_sql("SELECT * FROM trades", engine)

In [6]:
df_hs_codes = pd.read_csv("./hs_codes.csv")

In [7]:
df_country_data = pd.read_json("./country_data.json")

In [8]:
df_hs_codes2 = df_hs_codes.copy()

In [9]:
df_parents = df_hs_codes2[df_hs_codes2["Level"] == 2]
df_parents

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,1.001100e+10,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,1.001100e+10,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,1.001100e+10,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,1.001100e+10,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,1.001100e+10,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."
...,...,...,...,...,...,...,...,...
7238,1661793,2,930021000090,9.300110e+11,93,XIX,CHAPTER 93 - ARMS AND AMMUNITION; PARTS AND AC...,ARMS AND AMMUNITION; PARTS AND ACCESSORIES THE...
7264,1661819,2,940021000090,9.400110e+11,94,XX,"CHAPTER 94 - FURNITURE; BEDDING, MATTRESSES, M...","FURNITURE; BEDDING, MATTRESSES, MATTRESS SUPPO..."
7319,1661874,2,950021000090,9.400110e+11,95,XX,"CHAPTER 95 - TOYS, GAMES AND SPORTS REQUISITES...","TOYS, GAMES AND SPORTS REQUISITES; PARTS AND A..."
7362,1661917,2,960021000090,9.400110e+11,96,XX,CHAPTER 96 - MISCELLANEOUS MANUFACTURED ARTICLES,MISCELLANEOUS MANUFACTURED ARTICLES


In [10]:
def cleaning_codes(code):
    code = str(code)
    code_comm = None
    if len(code) == 11:
        code_comm = code[:5]
        parent = code[:1]
    else:
        code_comm = code[:6]
        parent = code[:2]
    try:
        description = df_parents[df_parents["Code_comm"] == parent][
            "Description"
        ].values[0]
    except Exception:
        description = None

    return (code_comm, parent, description)


df_hs_codes2[["Code_comm", "Parent", "Description"]] = df_hs_codes2.apply(
    lambda r: cleaning_codes(r["Code"]), axis=1, result_type="expand"
)
df_clean_codes = df_hs_codes2[
    ["Order", "Level", "Code", "Code_comm", "Parent", "Description", "Description_complex"]
]

In [11]:
df_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items


In [12]:
df_clean_codes.head()
# df_trades.head()
df_clean_codes = df_clean_codes[df_clean_codes["Description"].notnull()]

In [13]:
df_clean_codes.loc[:, "code_id"] = df_clean_codes.index
df_clean_codes.head()

Unnamed: 0,Order,Level,Code,Code_comm,Parent,Description,Description_complex,code_id
1,1654556,1,10011000090,10011,1,LIVE ANIMALS,SECTION I - LIVE ANIMALS; ANIMAL PRODUCTS,1
2,1654557,2,10021000090,10021,1,LIVE ANIMALS,CHAPTER 1 - LIVE ANIMALS,2
3,1654558,3,10100000080,10100,1,LIVE ANIMALS,"Live horses, asses, mules and hinnies",3
4,1654559,4,10121000010,10121,1,LIVE ANIMALS,- Horses,4
5,1654560,5,10121000080,10121,1,LIVE ANIMALS,-- Pure-bred breeding animals,5


In [14]:
df_country_data.loc[:, "country_id"] = df_country_data.index + 1
df_country_data.head()
#df_clean_codes.head()

Unnamed: 0,country,images_file,image_url,alpha-2,alpha-3,country-code,iso_3166-2,region,sub-region,intermediate-region,region-code,sub-region-code,intermediate-region-code,country_id
0,Afghanistan,Flag_of_Afghanistan.svg,https://upload.wikimedia.org/wikipedia/commons...,AF,AFG,4.0,ISO 3166-2:AF,Asia,Southern Asia,,142.0,34.0,,1
1,Albania,Flag_of_Albania.svg,https://upload.wikimedia.org/wikipedia/commons...,AL,ALB,8.0,ISO 3166-2:AL,Europe,Southern Europe,,150.0,39.0,,2
2,Algeria,Flag_of_Algeria.svg,https://upload.wikimedia.org/wikipedia/commons...,DZ,DZA,12.0,ISO 3166-2:DZ,Africa,Northern Africa,,2.0,15.0,,3
3,Andorra,Flag_of_Andorra.svg,https://upload.wikimedia.org/wikipedia/commons...,AD,AND,20.0,ISO 3166-2:AD,Europe,Southern Europe,,150.0,39.0,,4
4,Angola,Flag_of_Angola.svg,https://upload.wikimedia.org/wikipedia/commons...,AO,AGO,24.0,ISO 3166-2:AO,Africa,Sub-Saharan Africa,Middle Africa,2.0,202.0,17.0,5


In [15]:
df_clean_trades = pd.merge(
    left=df_trades,
    right=df_country_data[["alpha-3", "country_id"]],
    left_on="country_code",
    right_on="alpha-3",
    how="left",
)
df_clean_trades

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155
...,...,...,...,...,...,...,...,...,...,...
6216348,SYC,1999,890590,Import,3399.0,0.0,283.0,Number of items,SYC,155
6216349,SYC,1999,890600,Import,816.0,0.0,199.0,Number of items,SYC,155
6216350,SYC,1999,890710,Import,31387.0,0.0,1325.0,Number of items,SYC,155
6216351,SYC,1999,890790,Import,8749.0,0.0,1566.0,Number of items,SYC,155


In [16]:
df_clean_codes["Code_comm"] = df_clean_codes["Code_comm"].astype("int64")
df_trades.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6216353 entries, 0 to 6216352
Data columns (total 8 columns):
 #   Column         Dtype  
---  ------         -----  
 0   country_code   object 
 1   year           int64  
 2   comm_code      int64  
 3   flow           object 
 4   trade_usd      float64
 5   kg             float64
 6   quantity       float64
 7   quantity_name  object 
dtypes: float64(3), int64(2), object(3)
memory usage: 379.4+ MB


In [17]:
df_clean_trades = pd.merge(
    left=df_clean_trades,
    right=df_clean_codes[["Code_comm", "code_id"]],
    left_on="comm_code",
    right_on="Code_comm",
    how="left",
)
df_clean_trades

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id,Code_comm,code_id
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155,890200,6928
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155,890310,6930
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155,890391,6931
...,...,...,...,...,...,...,...,...,...,...,...,...
7040056,SYC,1999,890590,Import,3399.0,0.0,283.0,Number of items,SYC,155,890590,6939
7040057,SYC,1999,890600,Import,816.0,0.0,199.0,Number of items,SYC,155,890600,6940
7040058,SYC,1999,890710,Import,31387.0,0.0,1325.0,Number of items,SYC,155,890710,6944
7040059,SYC,1999,890790,Import,8749.0,0.0,1566.0,Number of items,SYC,155,890790,6945


In [18]:
def create_df_with_unique_values(unique_column_data, index_colum_name):
    index = 1
    list_of_index = []
    for _ in unique_column_data:
        list_of_index.append(index)
        index = index + 1

    return pd.DataFrame({index_colum_name: list_of_index, "value": unique_column_data})

In [19]:
df_quantity_name = create_df_with_unique_values(df_clean_trades["quantity_name"].unique(), "quantity_id")
df_quantity_name.head()

Unnamed: 0,quantity_id,value
0,1,Number of items
1,2,Weight in kilograms
2,3,No Quantity
3,4,Volume in litres
4,5,Number of pairs


In [20]:
df_flow = create_df_with_unique_values(df_clean_trades["flow"].unique(), "flow_id")
df_flow.head()

Unnamed: 0,flow_id,value
0,1,Import
1,2,Export
2,3,Re-Export
3,4,Re-Import


In [21]:
df_year = create_df_with_unique_values(df_clean_trades["year"].unique(), "year_id")
df_year.head()

Unnamed: 0,year_id,value
0,1,1998
1,2,1997
2,3,1996
3,4,1995
4,5,1994


In [22]:
df_clean_trades = df_clean_trades.merge(right=df_flow, left_on="flow", right_on="value", how='left')
#df_clean_trades = df_clean_trades.merge(right=df_year, left_on="year", right_on="value", how='left')
#df_clean_trades = df_clean_trades.merge(right=df_quantity_name, left_on="quantity_name", right_on="value", how='left')
df_clean_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id,Code_comm,code_id,flow_id,value
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155,890200,6928,1,Import
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155,890310,6930,1,Import
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,2,Export
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,3,Re-Export
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155,890391,6931,1,Import


In [23]:
df_clean_trades = df_clean_trades.merge(right=df_year, left_on="year", right_on="value", how='left')
df_clean_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id,Code_comm,code_id,flow_id,value_x,year_id,value_y
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155,890200,6928,1,Import,1,1998
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155,890310,6930,1,Import,1,1998
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,2,Export,1,1998
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,3,Re-Export,1,1998
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155,890391,6931,1,Import,1,1998


In [24]:
df_clean_trades = df_clean_trades.merge(right=df_quantity_name, left_on="quantity_name", right_on="value", how='left')
df_clean_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id,Code_comm,code_id,flow_id,value_x,year_id,value_y,quantity_id,value
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155,890200,6928,1,Import,1,1998,1,Number of items
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155,890310,6930,1,Import,1,1998,1,Number of items
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,2,Export,1,1998,1,Number of items
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,3,Re-Export,1,1998,1,Number of items
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155,890391,6931,1,Import,1,1998,1,Number of items


In [25]:
df_clean_trades["trade_id"] = df_clean_trades.index + 1
df_clean_trades.head()

Unnamed: 0,country_code,year,comm_code,flow,trade_usd,kg,quantity,quantity_name,alpha-3,country_id,Code_comm,code_id,flow_id,value_x,year_id,value_y,quantity_id,value,trade_id
0,SYC,1998,890200,Import,1431426.0,0.0,23000.0,Number of items,SYC,155,890200,6928,1,Import,1,1998,1,Number of items,1
1,SYC,1998,890310,Import,31406.0,0.0,2545.0,Number of items,SYC,155,890310,6930,1,Import,1,1998,1,Number of items,2
2,SYC,1998,890310,Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,2,Export,1,1998,1,Number of items,3
3,SYC,1998,890310,Re-Export,950.0,0.0,300.0,Number of items,SYC,155,890310,6930,3,Re-Export,1,1998,1,Number of items,4
4,SYC,1998,890391,Import,18251.0,0.0,450.0,Number of items,SYC,155,890391,6931,1,Import,1,1998,1,Number of items,5


### Sorting Columns of Joined DataFrames

In [26]:
df_country_data = df_country_data[['country_id', 'alpha-3', 'country', 'region', 'sub-region']]
df_clean_codes = df_clean_codes[['code_id', 'Code_comm', 'Description_complex', 'Description']]



### Renaming codes and country data

In [27]:
df_clean_codes = df_clean_codes.rename(columns={
  'Code_comm':'code', 'Description_complex': 'description', 'Description':'parent_description'
}) 
df_country_data = df_country_data.rename(columns={
  'alpha-3':'alpha_3',
  'sub-region':'sub_region'
})



### Cleaning df_trades

In [40]:
df_final_trades = df_clean_trades[["trade_id", "trade_usd", "kg", "quantity", "code_id", "quantity_id", "flow_id", "year_id","country_id"]].copy()
df_final_trades.head()

Unnamed: 0,trade_id,trade_usd,kg,quantity,code_id,quantity_id,flow_id,year_id,country_id
0,1,1431426.0,0.0,23000.0,6928,1,1,1,155
1,2,31406.0,0.0,2545.0,6930,1,1,1,155
2,3,950.0,0.0,300.0,6930,1,2,1,155
3,4,950.0,0.0,300.0,6930,1,3,1,155
4,5,18251.0,0.0,450.0,6931,1,1,1,155


In [41]:
df_final_trades.to_csv('target/trades.csv',index=False, sep = "|")
#df_clean_codes.to_csv('target/codes.csv', index=False, sep = "|")
#df_flow.to_csv('target/flow.csv', index=False, sep = "|")
#df_country_data.to_csv('target/countries.csv', index=False, sep = "|")
#df_quantity_name.to_csv('target/quantity.csv', index=False, sep = "|")
#df_year.to_csv('target/year.csv', index=False, sep = "|")


In [30]:
import boto3
import os
import redshift_connector

def load_to_s3( full_filename ):
 session = boto3.Session(
   
    aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'],
    aws_secret_access_key= os.environ['AWS_SECRET_ACCESS_KEY']
  )
  
 s3 = session.client(
    service_name = 's3',
    region_name = 'us-west-2',
 )
 
 #table = file_name.split('|')[0]
 path = f"./target/{full_filename}"
 bucket_name = "trades-s3"
 try:
  print(f"file {full_filename} loaded to s3")
  s3.upload_file(path, bucket_name, f"{full_filename}")
 except Exception as e:
   print(f"error: {e}")  

In [31]:
def copy_from_s3_to_redshift (bucket, full_filename):
  connection = redshift_connector.connect(
    host = os.environ['AWS_REDSHIFT_HOST'],
    database = os.environ['AWS_DATABASE'],
    port = 5439,
    user = os.environ['AWS_USER'],
    password = os.environ['AWS_PASSWORD']
  )
  cursor = connection.cursor()
  
  table_name= full_filename.split('.')[0]
  
  sql = f""" COPY public.{table_name}
             FROM 's3://{bucket}/{full_filename}'
             CSV
             DELIMITER '|'
             IGNOREHEADER 1
             REGION 'us-west-2'
             CREDENTIALS 'aws_access_key_id={os.environ['AWS_ACCESS_KEY_ID']};aws_secret_access_key={os.environ['AWS_SECRET_ACCESS_KEY']}'
             """
  try:
   cursor.execute(sql)
   connection.commit() 
   print(f"table {table_name} copied")
  except Exception as e:
   print(e)
  finally:
    cursor.close()
    connection.close()

In [33]:
files_to_load= os.listdir('./target')


for full_filename in files_to_load:
  load_to_s3(full_filename)
  copy_from_s3_to_redshift("trades-s3", full_filename)  

file year.csv loaded to s3
{'S': 'ERROR', 'C': 'XX000', 'M': 'Cannot COPY into nonexistent table year', 'F': '../src/sys/compression_analyzer.cpp', 'L': '669', 'R': 'XenGetLoadTableId'}
file quantity.csv loaded to s3
table quantity copied
file countries.csv loaded to s3
table countries copied
file flow.csv loaded to s3
table flow copied
file trades.csv loaded to s3
{'S': 'ERROR', 'C': 'XX000', 'M': "Load into table 'trades' failed.  Check 'sys_load_error_detail' system table for details.", 'F': '../src/pg/src/backend/commands/commands_copy.c', 'L': '741', 'R': 'CheckMaxRowError'}
file codes.csv loaded to s3
table codes copied


In [42]:
load_to_s3("trades.csv")
copy_from_s3_to_redshift("trades-s3", "trades.csv")

file trades.csv loaded to s3
table trades copied


In [34]:
files_to_load= os.listdir('./target')
files_to_load

['year.csv',
 'quantity.csv',
 'countries.csv',
 'flow.csv',
 'trades.csv',
 'codes.csv']