In [52]:
import configparser
from pathlib import Path
import os
from psycopg2 import sql
import psycopg2

In [53]:
path = Path(__vsc_ipynb_file__)
ROOT_DIR = path.parent.parent.absolute()
config_path = os.path.join(ROOT_DIR, "config.conf")

config = configparser.ConfigParser()
config.read(config_path)


['c:\\Users\\hares\\singaporesub\\config.conf']

In [54]:
USERNAME = config.get("redshift_config", "redshift_username")
PASSWORD = config.get("redshift_config", "redshift_password")
HOST = config.get("redshift_config", "redshift_hostname")
PORT = config.get("redshift_config", "redshift_port")
DATABASE = config.get("redshift_config", "redshift_database")


In [55]:
# Posts_fact
sql_create_date_dim = sql.SQL("""
CREATE TABLE date_dim (
  "date_id"               INTEGER                     NOT NULL PRIMARY KEY,  -- DATE
  "full_date"             DATE                        NOT NULL,
  "au_format_date"        CHAR(10)                    NOT NULL,
  "us_format_date"        CHAR(10)                    NOT NULL,  -- YEAR
  "year_number"           SMALLINT                    NOT NULL,
  "year_week_number"      SMALLINT                    NOT NULL,
  "year_day_number"       SMALLINT                    NOT NULL,
  "au_fiscal_year_number" SMALLINT                    NOT NULL,
  "us_fiscal_year_number" SMALLINT                    NOT NULL,  -- QUARTER
  "qtr_number"            SMALLINT                    NOT NULL,
  "au_fiscal_qtr_number"  SMALLINT                    NOT NULL,
  "us_fiscal_qtr_number"  SMALLINT                    NOT NULL,  -- MONTH
  "month_number"          SMALLINT                    NOT NULL,
  "month_name"            CHAR(9)                     NOT NULL,
  "month_day_number"      SMALLINT                    NOT NULL,  -- WEEK
  "week_day_number"       SMALLINT                    NOT NULL,  -- DAY
  "day_name"              CHAR(9)                     NOT NULL,
  "day_is_weekday"        SMALLINT                    NOT NULL,
  "day_is_last_of_month"  SMALLINT                    NOT NULL
) 
DISTSTYLE ALL SORTKEY (date_id);

""")

In [56]:
sql_insert_date_data = sql.SQL("""
INSERT INTO date_dim
  SELECT
    cast(seq + 1 AS INTEGER)                                      AS date_id,-- DATE
    datum                                                         AS full_date,
    TO_CHAR(datum, 'DD/MM/YYYY') :: CHAR(10)                      AS au_format_date,
    TO_CHAR(datum, 'MM/DD/YYYY') :: CHAR(10)                      AS us_format_date,-- YEAR
    cast(extract(YEAR FROM datum) AS SMALLINT)                    AS year_number,
    cast(extract(WEEK FROM datum) AS SMALLINT)                    AS year_week_number,
    cast(extract(DOY FROM datum) AS SMALLINT)                     AS year_day_number,
    cast(to_char(datum + INTERVAL '6' MONTH, 'yyyy') AS SMALLINT) AS au_fiscal_year_number,
    cast(to_char(datum + INTERVAL '3' MONTH, 'yyyy') AS SMALLINT) AS us_fiscal_year_number,-- QUARTER
    cast(to_char(datum, 'Q') AS SMALLINT)                         AS qtr_number,
    cast(to_char(datum + INTERVAL '6' MONTH, 'Q') AS SMALLINT)    AS au_fiscal_qtr_number,
    cast(to_char(datum + INTERVAL '3' MONTH, 'Q') AS SMALLINT)    AS us_fiscal_qtr_number,-- MONTH
    cast(extract(MONTH FROM datum) AS SMALLINT)                   AS month_number,
    to_char(datum, 'Month')                                       AS month_name,
    cast(extract(DAY FROM datum) AS SMALLINT)                     AS month_day_number,-- WEEK
    cast(to_char(datum, 'D') AS SMALLINT)                         AS week_day_number,-- DAY
    to_char(datum, 'Day')                                         AS day_name,
    CASE WHEN to_char(datum, 'D') IN ('1', '7')
      THEN 0
    ELSE 1 END                                                    AS day_is_weekday,
    CASE WHEN
      extract(DAY FROM (datum + (1 - extract(DAY FROM datum)) :: INTEGER +
                        INTERVAL '1' MONTH) :: DATE -
                       INTERVAL '1' DAY) = extract(DAY FROM datum)
      THEN 1
    ELSE 0 END                                                    AS day_is_last_of_month
  FROM
    -- Generate days for the next ~20 years starting from 2011.
    (
      SELECT
        '2011-01-01' :: DATE + generate_series AS datum,
        generate_series                        AS seq
      FROM generate_series(0, 20 * 365, 1)
    ) DQ
  ORDER BY 1;

""")

In [74]:
sql_create_post_fact = sql.SQL("""
CREATE TABLE post_fact (
    "post_id" VARCHAR(6) NOT NULL ,
    "author_id"  VARCHAR(10) ,
    "created_time" TIMESTAMPTZ NOT NULL,
    "flair" VARCHAR(120) NOT NULL,
    "title" TEXT,
    "body" TEXT default NULL,
    "num_comments" SMALLINT,
    "upvote_ratio" decimal(3,2) NOT NULL,
    "score" INT NOT NULL,
    "url" TEXT,
    "distinguised" BOOLEAN default False,
    "is_original_content" BOOLEAN default False,
    "over_18" BOOLEAN default False,
    PRIMARY KEY(post_id),
    FOREIGN KEY(author_id) references Author_Dim(author_id)
)
""")

In [75]:
sql_create_author_dim = sql.SQL("""
CREATE TABLE Author_Dim (
    "author_id" VARCHAR(10) NOT NULL ,
    "acc_creation_date" TIMESTAMPTZ NOT NULL ,
    "author_name" VARCHAR(20) ,
    "link_flair" INT,
    "comment_karma" INT,
    "is_gold" BOOLEAN default False,
    "is_mod" BOOLEAN default False,
    "is_employee" BOOLEAN default False,
    PRIMARY KEY(author_id)
)
""")

In [59]:
sql_copy_author_to_redshift = sql.SQL("""

COPY Author_Dim (author_id, author_name, link_flair, comment_karma, acc_creation_date, is_gold, is_mod, is_employee)FROM 's3://initial-data-load-bucket/test_author_12.csv'  credentials 'aws_iam_role=arn:aws:iam::075207432376:role/REDHSIFTACESS'
FORMAT AS CSV DELIMITER ',' QUOTE '"'
region 'ap-southeast-1'

 IGNOREHEADER 1

;




""")

In [92]:
sql_copy_post_redshift = sql.SQL("""
                       
COPY post_fact (title, body, post_id, score, upvote_ratio, flair, created_time, num_comments, author_id, url ,distinguised, is_original_content, over_18) FROM 's3://initial-data-load-bucket/test_posts_12_null.csv' IAM_ROLE 'arn:aws:iam::075207432376:role/REDHSIFTACESS' FORMAT AS CSV DELIMITER ',' QUOTE '"' REGION AS 'ap-southeast-1'
IGNOREHEADER 1                                             
                                 
                                 """)

In [61]:
def connect_to_redshift():
    try:
        rs_wh = psycopg2.connect(dbname = DATABASE , user = USERNAME , password = PASSWORD , host = HOST , port = PORT)
        rs_wh.autocommit = True

        return rs_wh
    except Exception as e:
        print(f"Unable to connect to Redshift. Error {e}")
        sys.exit(1)

In [76]:
def create_and_load_date_dimension(rs_wh):
    with rs_wh:
        cur = rs_wh.cursor()
        cur.execute(sql_create_date_dim)
        print("created date dim")
        cur.execute(sql_insert_date_data)
        print("inserted date dim")
        


In [63]:
def create_post_facttable(rs_wh):
    with rs_wh:
        cur = rs_wh.cursor()
        cur.execute(sql_create_post_fact)

In [64]:
def create_post_author_dim(rs_wh):
    with rs_wh:
        cur = rs_wh.cursor()
        cur.execute(sql_create_author_dim)

In [65]:
def insertdata_dim_dimtable(rs_wh):
    with rs_wh:
        cur = rs_wh.cursor()
        cur.execute(sql_copy_author_to_redshift)

In [66]:
def insertdata_post_facttable(rs_wh):
    with rs_wh:
        cur = rs_wh.cursor()
        cur.execute(sql_copy_s3_to_redshift)

In [93]:
def main():
    rs_conn = connect_to_redshift().cursor()
    
    
    #rs_conn.execute(sql_create_author_dim)
    
    
    #rs_conn.execute(sql_copy_author_to_redshift)
    
    
    #rs_conn.execute(sql_create_post_fact)
    rs_conn.execute(sql_copy_post_redshift)

In [94]:
if __name__ == '__main__':
    main()