# Data Ingestion

## Copy CSV files to S3

In [2]:
# Assume all the pre-requisites were set up 
%store -r setup_instance_check_passed
%store -r setup_dependencies_passed
%store -r setup_s3_bucket_passed
%store -r setup_iam_roles_passed


In [3]:
# Load Libraries
import boto3
import sagemaker
import pandas as pd


sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [56]:
# Public Data
!aws s3 ls s3://ads508-staggs/

# Copy Datasets from :
# Source of data
s3_public_path_csv = "s3://ads508-staggs/"

# Destination Data:
s3_private_path_csv = "s3://{}/ads508-staggs/private".format(bucket)

# Store path
%store s3_private_path_csv

#s3_private_path_tweeter = "s3://{}/ads508-staggs/tweeter/".format(bucket)
#s3_private_path_nyt = "s3://{}/ads508-staggs/nyt/".format(bucket)


# Copy datasets
!aws s3 cp --recursive $s3_public_path_csv/ $s3_private_path_csv/ --exclude "*" --include "hashtag_donaldtrump.csv"
!aws s3 cp --recursive $s3_public_path_csv/ $s3_private_path_csv/ --exclude "*" --include "hashtag_joebiden.csv"
!aws s3 cp --recursive $s3_public_path_csv/ $s3_private_path_csv/ --exclude "*" --include "nyt-comments-2020.csv"


2024-03-16 21:27:21  483859793 hashtag_donaldtrump.csv
2024-03-16 21:27:21  380820416 hashtag_joebiden.csv
2024-03-16 21:27:21 3066945799 nyt_comments_2020.csv
Stored 's3_private_path_csv' (str)


In [15]:
print(s3_private_path_csv)

s3://sagemaker-us-east-1-851725336500/ads508-staggs/


In [57]:
!aws s3 ls $s3_private_path_csv

## Create Database, Tables and Parquets
### Database

In [5]:
# Setup and check pre-requisites to create Database
ingest_create_athena_db_passed = False

from pyathena import connect

In [46]:
import pandas as pd

# Create Database
database_name = "dbpoliticpulsecomment"

# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)

pd.read_sql(statement, conn)

# Verify DB successfully created
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
#df_show.head(5)


In [47]:
# End of Create Database
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

### Tables

In [48]:
# Create Tweeter Tables
table_name_csv = "tweeter"
#s3_private_path_tweeter = "s3://{}/ADS508-staggs/tweeter".format(bucket)

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
  created_at TIMESTAMP,
  tweet_id FLOAT,
  tweet VARCHAR(250),
  likes INT,
  retweet_count INT,
  source VARCHAR(45),
  user_id INT,
  user_name VARCHAR(250),
  user_screen_name VARCHAR(45),
  user_description VARCHAR(250),
  user_join_date TIMESTAMP,
  user_followers_count INT,
  user_location VARCHAR(45),
  lat FLOAT,
  long FLOAT,
  city VARCHAR(45),
  country VARCHAR(45),
  continent VARCHAR(45),
  state VARCHAR(45),
  state_code VARCHAR(45),
  collected_at VARCHAR(45)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '{}'
TBLPROPERTIES ('skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_tweeter
)

pd.read_sql(statement, conn)


In [49]:
# testing hashtag_donaldtrump.csv tweet
tweet = "You get a tie! And you get a tie! #Trump ‘s rally #Iowa https://t.co/jJalUUmh5D"
# testing hashtag_joebiden.csv tweet
tweet = "@chrislongview Watching and setting dvr. Let’s give him bonus ratings!! #JoeBiden"


statement = """SELECT * FROM {}.{}
    WHERE tweet = '{}' LIMIT 100""".format(
    database_name, table_name_csv, tweet
)

df = pd.read_sql(statement, conn)
df.head(5)


Unnamed: 0,created_at,tweet_id,tweet,likes,retweet_count,source,user_id,user_name,user_screen_name,user_description,...,user_followers_count,user_location,lat,long,city,country,continent,state,state_code,collected_at


In [39]:
# Create NYT comment Table
table_name_csv = "nyt_comment"
s3_private_path_nyt = "s3://{}/ADS508_project/nyt".format(bucket)

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} (
  commentID INT,
  status VARCHAR(20),
  commentSequence INT,
  userID INT,
  userDisplayName VARCHAR(45),
  userLocation VARCHAR(45),
  userTitle VARCHAR(10),
  commentBody VARCHAR(500),
  createDate TIMESTAMP,
  updateDate TIMESTAMP,
  approveDate TIMESTAMP,
  recommendation INT,
  replyCount INT,
  editorsSelection VARCHAR(20),
  parentID INT,
  parentUserDisplayName VARCHAR(45),
  depth INT,
  commentType VARCHAR(20),
  trusted VARCHAR(20),
  recommendedFlag VARCHAR(20),
  permID INT,
  isAnonymous VARCHAR(20),
  articleID VARCHAR(150)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '{}'
TBLPROPERTIES ('skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_nyt)

pd.read_sql(statement, conn)


In [40]:
statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,nyt_comment
1,tweeter


In [41]:
commentBody = "@Philip Brown Agree 110%."

statement = """SELECT * FROM {}.{}
    WHERE commentBody = '{}' LIMIT 100""".format(
    database_name, table_name_csv, commentBody
)

df = pd.read_sql(statement, conn)
df.head(5)


Unnamed: 0,commentid,status,commentsequence,userid,userdisplayname,userlocation,usertitle,commentbody,createdate,updatedate,...,editorsselection,parentid,parentuserdisplayname,depth,commenttype,trusted,recommendedflag,permid,isanonymous,articleid


### Parquets

In [None]:
# CREATE A NEW COMBINE TABLE. Union all tweeter and nyt_comment WORKING IN PROGRESS,  GET ERROR : line 25:1: mismatched input 'UNION'. Expecting: <EOF>
# IF IT'S ONLY 1 SELECT AND THEN "B" .."T", THE ERROR MESSAGE: COLUMN_NOT_FOUND: line 2:5: Column 'comment_id' cannot be resolved or requester is not authorized to access requested resources

statement = """
SELECT 
    tweet_id AS comment_id,
    tweet AS comment_body,
    user_id AS user_id,
    user_name AS user_name,
    user_location AS user_location,
    created_at AS create_date,
    retweet_count AS reply_retweet,
    likes AS recommendation_like,
    (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'trump', ''))) / LENGTH('trump') AS trump_count,
    (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'biden', ''))) / LENGTH('biden') AS biden_count,
    CASE 
        WHEN 
            (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'biden', ''))) / LENGTH('biden') > 
            (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'trump', ''))) / LENGTH('trump') 
        THEN 1
        WHEN              
            (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'biden', ''))) / LENGTH('biden') < 
            (LENGTH(tweet) - LENGTH(REPLACE(LOWER(tweet), 'trump', ''))) / LENGTH('trump') 
        THEN 2
        ELSE NULL
    END AS candidatepoll
FROM 
    tweeter
LIMIT 10

UNION ALL

SELECT 
    commentid AS comment_id,
    commentbody AS comment_body,
    userID AS user_id,
    userDisplayName AS user_name,
    userLocation AS user_location,
    createDate AS create_date,
    replyCount AS reply_retweet,
    recommendation AS recommendation_like,
    (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'trump', ''))) / LENGTH('trump') AS trump_count,
    (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'biden', ''))) / LENGTH('biden') AS biden_count,
    CASE
        WHEN
            (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'biden', ''))) / LENGTH('biden') > 
            (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'trump', ''))) / LENGTH('trump') 
        THEN 1
        WHEN
            (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'biden', ''))) / LENGTH('biden') < 
            (LENGTH(commentbody) - LENGTH(REPLACE(LOWER(commentbody), 'trump', ''))) / LENGTH('trump') 
        THEN 2
        ELSE NULL
    END AS candidatepoll
FROM 
    nyt_comment
LIMIT 10
"""


In [None]:
# Setup to create Parquet
ingest_create_athena_table_parquet_passed = False

# Set S3 path to Parquet data
s3_path_parquet = "s3://{}/ADS508_project/parquet".format(bucket)

table_comment = "comment"

table_parquet = "comment_parquet"

In [None]:
# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['candidatepoll']) AS
SELECT comment_id,
         username,
         userlocation,
         comment_body,
         reply_retweet,
         recommendation_likes,
         trump_count,
         biden_count,
         candidatepoll
         .....
FROM {}.{}""".format(
    database_name, table_parquet, s3_path_parquet, database_name, table_comment
)

print(statement)

## Query Data

In [None]:
# Setup and check pre-requisites to create Database
%store -r ingest_create_athena_table_passed
!pip install --disable-pip-version-check -q awswrangler==2.3.0
import awswrangler as wr


In [None]:
# Read in Datasets

In [None]:
# Dataset Info

# Data Exploration

In [None]:
# Dataset Descriptives

In [None]:
# Data Distributions

In [None]:
# Filtering Text by Location