In [1]:
import pandas as pd

In [2]:
# CONFIGS

import configparser

config = configparser.ConfigParser()
config.read('app.cfg')

AWS_ACCESS_KEY = config.get('AWS', 'ACCESS_KEY')
AWS_ACCESS_SECRET_KEY = config.get('AWS', 'SECRET_ACCESS_KEY')

RED_HOST = config.get('REDSHIFT', 'HOST')
RED_DBNAME = config.get('REDSHIFT', 'DBNAME')
RED_USER = config.get('REDSHIFT', 'USER')
RED_PASSWORD = config.get('REDSHIFT', 'PASSWORD')
RED_PORT = config.get('REDSHIFT', 'PORT')

In [3]:
# read file from S3
import boto3

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=AWS_ACCESS_KEY,
                       aws_secret_access_key=AWS_ACCESS_SECRET_KEY
                   )
bucket=s3.Bucket('final.project.krazy.gongryong')
song_data_files = [filename.key for filename in bucket.objects.all()]
song_data_files[:10]

['Country.csv',
 'GDP.csv',
 'country.csv',
 'covid_date.csv',
 'full-list-total-tests-for-covid-19.csv']

In [4]:
# upload covid data file

csv_file_covid_origin = 'data/full-list-total-tests-for-covid-19-origin.csv'
csv_file_covid = 'data/full-list-total-tests-for-covid-19.csv'
# bucket.download_file(song_data_files[3], csv_file_covid)
df_covid = pd.read_csv(csv_file_covid_origin)
df_covid['day'] = pd.to_datetime(df_covid['day'])
df_covid.to_csv(csv_file_covid, index=False)
bucket.upload_file(csv_file_covid, 'full-list-total-tests-for-covid-19.csv')
df_covid.head(5)

Unnamed: 0,Entity,Code,day,total_tests,annotations
0,Albania,ALB,2020-02-25,8,tests performed
1,Albania,ALB,2020-02-26,13,tests performed
2,Albania,ALB,2020-02-27,17,tests performed
3,Albania,ALB,2020-02-28,18,tests performed
4,Albania,ALB,2020-02-29,26,tests performed


In [5]:
# upload date data
csv_file_covid_date = 'data/covid_date.csv'

df_covid_date = pd.DataFrame()
df_covid_date['date'] = pd.to_datetime(df_covid['day']).drop_duplicates()

df_covid_date['year'] = df_covid_date['date'].dt.year
df_covid_date['month'] = df_covid_date['date'].dt.month
df_covid_date['day'] = df_covid_date['date'].dt.day
df_covid_date['weekday'] = df_covid_date['date'].dt.weekday
df_covid_date['day_of_year'] = df_covid_date['date'].dt.dayofyear
bucket.upload_file(csv_file_covid_date, 'covid_date.csv')

df_covid_date.head(5)

Unnamed: 0,date,year,month,day,weekday,day_of_year
0,2020-02-25,2020,2,25,1,56
1,2020-02-26,2020,2,26,2,57
2,2020-02-27,2020,2,27,3,58
3,2020-02-28,2020,2,28,4,59
4,2020-02-29,2020,2,29,5,60


In [6]:
# transform & upload country data to s3

csv_file_country_origin = 'data/CountryOrigin.csv'
csv_file_country = 'data/country.csv'
df_country_origin = pd.read_csv(csv_file_country_origin)
df_country = pd.DataFrame();
df_country['countrycode'] = df_country_origin['CountryCode']
df_country['shortname'] = df_country_origin['ShortName'].str[:250]
df_country['tablename'] = df_country_origin['TableName'].str[:250]
df_country['longname'] = df_country_origin['LongName'].str[:250]
df_country['alpha2code'] = df_country_origin['Alpha2Code'].str[:250]
df_country['currencyunit'] = df_country_origin['CurrencyUnit'].str[:250]
df_country['specialnotes'] = df_country_origin['SpecialNotes'].str[:250]
df_country['region'] = df_country_origin['Region'].str[:250]
df_country['incomegroup'] = df_country_origin['IncomeGroup'].str[:250]
df_country['wb2code'] = df_country_origin['Wb2Code'].str[:250]
df_country.to_csv(csv_file_country, index=False)
bucket.upload_file(csv_file_country, 'country.csv')

In [7]:
# create tables

%run -i 'create_tables.py'

DROP TABLE IF EXISTS covid_events
DROP TABLE IF EXISTS gdp_dim
DROP TABLE IF EXISTS country_dim
DROP TABLE IF EXISTS covid_date_dim

CREATE TABLE IF NOT EXISTS covid_events(
  entity          VARCHAR,
  code            VARCHAR,
  day       VARCHAR,
  total_tests          INTEGER,
  annotations   VARCHAR
);


CREATE TABLE IF NOT EXISTS country_dim(
  countrycode          VARCHAR,
  shortname            VARCHAR,
  tablename       VARCHAR,
  longname          VARCHAR,
  alpha2code   VARCHAR,
  currencyunit   VARCHAR,
  specialnotes   VARCHAR,
  region   VARCHAR,
  incomegroup   VARCHAR,
  wb2code   VARCHAR,
  PRIMARY KEY(countrycode)
);


CREATE TABLE IF NOT EXISTS gdp_dim(
  country     VARCHAR,
  ranking          INTEGER,
  economy          VARCHAR,
  dollars          INTEGER,
  PRIMARY KEY(country)
);


CREATE TABLE IF NOT EXISTS covid_date_dim(
  date        VARCHAR,
  year        INTEGER,
  month       INTEGER,
  day         INTEGER,
  weekday     INTEGER,
  day_of_year INTEGER,
  PR

In [8]:
# load data

%run -i 'load_data.py'


    COPY covid_events FROM 's3://final.project.krazy.gongryong/full-list-total-tests-for-covid-19.csv'
    IAM_ROLE 'arn:aws:iam::032073717423:role/dwhRole'
    csv
    IGNOREHEADER 1;


    COPY country_dim
    FROM 's3://final.project.krazy.gongryong/country.csv'
    IAM_ROLE 'arn:aws:iam::032073717423:role/dwhRole'
    csv
    IGNOREHEADER 1;


    COPY covid_date_dim FROM 's3://final.project.krazy.gongryong/covid_date.csv'
    IAM_ROLE 'arn:aws:iam::032073717423:role/dwhRole'
    csv
    IGNOREHEADER 1;


    COPY gdp_dim FROM 's3://final.project.krazy.gongryong/GDP.csv'
    IAM_ROLE 'arn:aws:iam::032073717423:role/dwhRole'
    csv
    IGNOREHEADER 1;



In [36]:
# query data

import psycopg2

conn=psycopg2.connect(dbname= RED_DBNAME, host=RED_HOST, port= RED_PORT, user= RED_USER, password= RED_PASSWORD)
cur = conn.cursor()
no_duplicated_event_log = True
no_normalized_timestamp = True

# data quality check
# 1. there is no duplicated event log
cur.execute("""
select 
  count(*) 
from 
  (
    select 
      count(*) as count_g 
    from 
      public.covid_events 
    group by 
      entity, 
      code, 
      day
  ) 
where 
  count_g > 1;
""")
if (cur.fetchone()[0] > 0):
    print('there are duplicated covid event data')
    print(result[0])
    no_normalized_timestamp = False

# 2. check if there is no normalized time data
cur.execute("""
SELECT count(*) FROM covid_events ce
FULL OUTER JOIN covid_date_dim cd
ON ce.day = cd.date
WHERE ce.day IS NULL
OR cd.date IS NULL
""")
if (cur.fetchone()[0] > 0):
    print('there are event data with un-normalized timestamp')
    print(result[0])
    no_normalized_timestamp = False

if no_normalized_timestamp and no_normalized_timestamp:
    # get the total test results of the countries in asia in top 100 GDP
    cur.execute("""
    select ce.code, sum(ce.total_tests) as total_tests  From covid_events as ce
    left join covid_date_dim as cdd on ce.day = cdd.date
    left join gdp_dim as gd on ce.code = gd.country
    left join country_dim as cd on ce.code = cd.countrycode
    where gd.ranking < 100
    and cdd.year = 2020
    and cdd.weekday in (5, 6)
    and cd.region like '%Asia%'
    group by ce.code
    order by total_tests DESC
    """)
    result = pd.DataFrame(cur.fetchall())
    print(result)
else:
    print('Data quality check falied.')

cur.close() 
conn.close()


      0           1
0   IND  4167336349
1   RUS  2835338454
2   GBR  1038130568
3   ITA   773935667
4   TUR   631095353
5   DEU   532997526
6   DNK   238672673
7   AUS   238379973
8   BEL   210104817
9   POL   207153488
10  PAK   202253190
11  PHL   189487915
12  KAZ   176842948
13  PRT   171489436
14  KOR   134780570
15  MYS   117907799
16  BGD   108412510
17  JPN   108068858
18  AUT   105975855
19  IDN    91595968
20  UKR    83414157
21  GRC    76058520
22  NLD    73990571
23  IRL    72570051
24  NOR    69382603
25  CHE    68391067
26  SRB    66743942
27  FIN    65776459
28  THA    64129404
29  NPL    56707812
30  HUN    50569716
31  LTU    48770240
32  SVK    48204855
33  LUX    48024130
34  NZL    47310292
35  BGR    30754373
36  BLR    30325572
37  MMR    26768287
38  LKA    24383611
39  LVA    22568534
40  HRV    19599023
41  SVN    17292200
42  VNM    15279657
43  HKG     4031954
44  SGP      246254
