# Base Declaration of Project Libraries
> To ensure effective data cleaning & ingestion; reduce the size of the entire dataset and execute standard formatting processes on each file.

In [None]:
import boto3
import psycopg2

import pandas as pd
import numpy as np
from dotenv import load_dotenv
import os
import csv
import re

import simplejson as json
import random
from datetime import datetime
from dateutil import parser
import matplotlib.pyplot as plt
import math
import mysql.connector

### Initial Review & Assessment of the raw data file
> Note the separating character for this dataset is a semi-colon & not the traditional comma character.

In [None]:
file_path = f".//datasets//ingested//lifebear//lifebear.csv"

lifebear_dataset = pd.read_csv(file_path, sep=';', low_memory=True)
lifebear_dataset.tail(5)

### Dataset Chunking
> Breaking a large file into multiple smaller files increases the probability of data ingestion.

In [None]:
# Specify the path to the folder containing the files
file_path = f".//datasets//ingested//lifebear//lifebear.csv"
output_folder = f".//datasets//ingested//lifebear//records//chunks//"

def split_file(file_path, chunk_size, output_directory):
    os.makedirs(output_directory, exist_ok=True)
    
    with open(file_path, 'rb') as file:
        file_count = 0
        current_size = 0
        output_file = None
        
        for line in file:
            if output_file is None or current_size >= chunk_size * 1024 * 1024:
                if output_file:
                    output_file.close()
                file_count += 1
                current_size = 0
                file_name = f'lifebear_{str(file_count).zfill(3)}.csv'
                current_file_path = os.path.join(output_directory, file_name)
                output_file = open(current_file_path, 'w', encoding='utf-8')

            try:
                decoded_line = line.decode('utf-8')  # Try decoding with UTF-8
            except UnicodeDecodeError:
                decoded_line = line.decode('latin-1')  # If that fails, try decoding with Latin-1
            
            output_file.write(decoded_line)
            current_size += len(line)

        if output_file:
            output_file.close()

    print(f'Splitting complete. Total files created: {file_count}')

# Usage
split_file(file_path, 100, output_folder)


### Review of chunked dataset
> Loading of chunked file into pandas. Note the separating character for this dataset is a semi-colon & not the traditional comma character.

In [None]:
file_path = f".//datasets//ingested//lifebear//records//chunks//lifebear_001.csv"

lifebear_dataset = pd.read_csv(file_path, sep=';', low_memory=True)
lifebear_dataset.tail(5)

### Removal of duplicate records/rows based on specific columns
> To ensure consistent data we check for the appearance of duplicate rows based on unique columns (example email, mobile, credit card number, etc).

In [None]:
file_path = f".//datasets//Condo//records//condo_raw_dataset.csv"
output_file_name = f".//datasets//Condo//records//v0//condo_raw_dataset_wd.csv"

condo_dataset = pd.read_csv(file_path, low_memory=True)
condo_wd_dataset = condo_dataset.drop_duplicates(subset=["first_name", "email", "work_phone"])
condo_wd_dataset.to_csv(output_file_name, encoding='utf-8', index=False)

condo_dataset = pd.read_csv(output_file_name, low_memory=True)
# condo_dataset.head(5)
condo_dataset.info()

In [None]:
import os
import csv
import re
from datetime import datetime

# Function to check if a string contains only letters
def only_letters(text):
    return text.isalpha()

def only_numbers(phone):
    phone_pattern = re.compile(r'^[0-9(). ]+$')
    return bool(re.fullmatch(phone_pattern, str(phone)))

def validate_characters(username):
    name_pattern = re.compile(r'^[a-zA-Z0-9&|#()/\-._ ]*$')
    return bool(re.fullmatch(name_pattern, str(username)))

def validate_length(username):
    username_str = str(username)
    return len(username_str) < 16 and len(username_str.encode('utf-8')) < 16

def validate_username(username):
    return validate_characters(username) and validate_length(username)

def validate_name(name):
    name_pattern = re.compile(r'^[a-zA-Z]+$')
    return bool(re.fullmatch(name_pattern, str(name)))

def validate_phone(phone):
    phone_pattern = re.compile(r'^[0-9().+ ]+$')
    return bool(re.fullmatch(phone_pattern, str(phone)))

def validate_ip(ip):
    ip_pattern = re.compile(r'^[0-9.:]+$')
    return bool(re.fullmatch(ip_pattern, str(ip)))

def validate_passhash(pass_hash):
    state_pattern = re.compile(r'^[a-zA-Z0-9&|#$_()\/.+= ]+$')
    return bool(re.fullmatch(state_pattern, str(pass_hash)))

def validate_dob(dob):
    dob_pattern = re.compile(r'^[0-9|:()/\- ]+$')
    return bool(re.fullmatch(dob_pattern, str(dob)))

def validate_email(email):
    email_pattern = re.compile(r'^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w+$')
    return bool(re.fullmatch(email_pattern, str(email)))

def validate_address(address):
    address_pattern = re.compile(r'^[a-zA-Z0-9&|#$()\/.+=: ]+$')
    return bool(re.fullmatch(address_pattern, str(address)))

def validate_state(state):
    state_pattern = re.compile(r'^[a-zA-Z&|#(). ]+$')
    return bool(re.fullmatch(state_pattern, str(state)))


def to_camel_case(s):
    if s is None:
        return None
    return ' '.join(word.capitalize() for word in s.split())


def to_lower(s):
    if s is None:
        return None
    return s.lower()


def convert_date_format(date_str):
    # Parse the input date string
    input_date = datetime.strptime(date_str, "%Y-%m-%d")
    # Convert the date to the desired format
    output_date_str = input_date.strftime("%m/%d/%Y")
    return output_date_str

# Function to process a single file
def process_file(input_file_path, output_folder):
    filename = os.path.splitext(os.path.basename(input_file_path))[0]
    output_file_path = os.path.join(output_folder, f"{filename}_structured.csv")
    garbage_file_path = os.path.join(output_folder, f"garbage//{filename}_garbage.csv")

    # Create the output and garbage files with UTF-8 encoding
    with open(output_file_path, 'w', newline='', encoding='utf8') as output_file, open(garbage_file_path, 'w', newline='', encoding='utf8') as garbage_file:
        output_writer = csv.writer(output_file)
        garbage_writer = csv.writer(garbage_file)
        # Write the header row to the output file
        
        # id[0];username[1];email[2];password[3];salt[5];birthday_on[6];gender[7]
        header = ["username", "email", "password", "salt", "dob", "gender"]
        output_writer.writerow(header)

        # Open the input file for reading and process the contents with UTF-8 encoding
        with open(input_file_path, 'r', encoding='utf8') as input_file:
            # Skip the first line (header definitions)
            # next(input_file)

            # Process the remaining lines
            for line in input_file:
                # Initialize variables with None
                username = email = password = dob = salt = gender = None
                # Split the line using ";" as the separator
                parts = line.strip().split(';')


                
                 # Validate USERNAME record | USERNAME[1]
                if len(parts) > 1 and parts[1]:
                    try:
                        email_pattern = r'\S+@\S+'
                        if validate_username(parts[1].replace('"', '').replace("'", '').replace(' ', '')) and not re.match(email_pattern, parts[1].replace('"', '').replace("'", '').replace(' ', '')):
                            username = parts[1].replace('.', '').replace('"', '').replace("'", '')
                    except ValueError:
                        # Write the invalid record to the garbage file
                        garbage_writer.writerow([line.strip()])
                        continue
                else:
                    username = None

                
                # Validate EMAIL record | EMAIL_ID[2]
                if len(parts) > 2:
                    raw_email = parts[2]
                    
                    # Clean the email
                    cleaned_email = re.sub(r"[\"'\t#]", '', raw_email).strip()
                    
                    if cleaned_email.lower() == "null" or cleaned_email == "":
                        email = None
                    else:
                        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
                        if re.match(email_pattern, cleaned_email):
                            email = cleaned_email
                        else:
                            # Write the invalid record to the garbage file
                            garbage_writer.writerow([line.strip()])
                            continue
                else:
                    email = None

                # id[0];username[1];email[2];password[3];salt[5];birthday_on[6];gender[7]
                
                # Validate DOB record | DOB[6]
                if len(parts) > 6 and parts[6]:
                    try:
                        email_pattern = r'\S+@\S+'
                        if not re.match(email_pattern, parts[6].replace('"', '').replace("'", '').replace('.', '').replace('-', '')):
                            dob = convert_date_format(parts[6].replace('"', '').replace("'", '').replace("+", '').replace('\t', '').replace(' ', ''))
                    except ValueError:
                        # Write the invalid record to the garbage file
                        garbage_writer.writerow([line.strip()])
                        continue
                else:
                    dob = None

                
                  # Test GENDER record | GENDER [7]
                if len(parts) > 7 and parts[7]:
                    try:
                        email_pattern = r'\S+@\S+'
                        if only_numbers(parts[7].replace("'", "").replace('"', '').replace("-", "").replace("+", "").replace(" ", "").replace("(", "").replace(")", "")) and not re.match(email_pattern, parts[7].replace('"', '').replace("+", "").replace("'", '').replace(' ', '').replace('-', '')):
                            gender = parts[7].replace('"', '').replace("'", '').replace("+", '').replace('~', '').replace('\t', '').replace(" ", "").replace("(", "").replace(")", "").replace("-", "").replace("+", "")
                    except ValueError:
                        # Write the invalid record to the garbage file
                        garbage_writer.writerow([line.strip()])
                        continue
                else:
                    gender = None


                  # Test PASSWORD record | PASSWORD [3]
                if len(parts) > 3 and parts[3]:
                    try:
                        email_pattern = r'\S+@\S+'
                        if validate_address(parts[3].replace("'", "").replace('"', '').replace("-", "").replace(" ", "")) and not re.match(email_pattern, parts[3].replace('"', '').replace("'", '').replace(' ', '').replace('-', '')):
                            password = parts[3].replace('"', '').replace("'", '').replace("+", '').replace('~', '').replace('\t', '').replace(',', '')
                    except ValueError:
                        # Write the invalid record to the garbage file
                        garbage_writer.writerow([line.strip()])
                        continue
                else:
                    password = None


                  # Validate SALT record | SALT[5]
                if len(parts) > 5 and parts[5]:
                    try:
                        email_pattern = r'\S+@\S+'
                        if not re.match(email_pattern, parts[5].replace('"', '').replace("'", '').replace('.', '')):
                            salt = parts[5].replace('"', '').replace("'", '').replace("+", '').replace('/', '').replace('\t', '')
                    except ValueError:
                        # Write the invalid record to the garbage file
                        garbage_writer.writerow([line.strip()])
                        continue
                else:
                    salt = None

                
                # firstname = to_camel_case(firstname)
                # lastname = to_camel_case(lastname)
                # address = to_camel_case(address)
                # city = to_camel_case(city)
                email = to_lower(email)
                username = to_lower(username)

                # first_name,last_name,email,mobile_phone,work_phone
                # Write the cleaned data as a CSV line to the output file
                output_writer.writerow([username, email, password, salt, dob, gender])

            print(f"Conversion completed for {filename}. Valid data saved to {filename}_structured.csv. Invalid data saved to {filename}_garbage.csv.")

# Specify the folder containing the CSV files
input_folder = f"./datasets/ingested/Lifebear/records/chunks/"
# Specify the folder where the output files will be saved
output_folder = f"./datasets/ingested/Lifebear/processed/"
# List all CSV files in the input folder
csv_files = [file for file in os.listdir(input_folder) if file.endswith(".csv")]

# Process each CSV file in the folder
for csv_file in csv_files:
    input_file_path = os.path.join(input_folder, csv_file)
    process_file(input_file_path, output_folder)


In [None]:
import os

def remove_empty_lines(input_file, output_file):
    try:
        with open(input_file, 'r', encoding='utf-8') as infile, open(output_file, 'w', encoding='utf-8') as outfile:
            for line in infile:
                # Strip leading/trailing whitespace and check if line is not empty
                if line.strip() and ',,,,' not in line and 'NULL,,,NULL,' not in line:
                    outfile.write(line)
        
        print(f"Lines with specified text removed successfully from {input_file}!")
    except Exception as e:
        print(f"An error occurred with file {input_file}: {e}")


# Specify the folder containing the CSV files
input_folder = "./datasets/ingested/Lifebear/processed/"
# Specify the folder where the output files will be saved
output_folder = "./datasets/ingested/Lifebear/to-be-ingested/"
# List all CSV files in the input folder
csv_files = [file for file in os.listdir(input_folder) if file.endswith(".csv")]

# Process each CSV file in the folder
for csv_file in csv_files:
    input_file_path = os.path.join(input_folder, csv_file)
    output_file_path = os.path.join(output_folder, csv_file)  # Ensure the output file path includes the original file name
    remove_empty_lines(input_file_path, output_file_path)


In [None]:
import os
import csv
import codecs

# Function to remove non-UTF-8 encoded characters from a string
def remove_non_utf8(text):
    return text.encode('utf-8', 'ignore').decode('utf-8')

# Specify the folder containing the CSV files
input_folder = ".//datasets//ingested//Lifebear//to-be-ingested//"

# Specify the folder where the cleaned files will be saved
output_folder = ".//datasets//ingested//Lifebear//to-be-ingested//final//"

# List all CSV files in the input folder
csv_files = [file for file in os.listdir(input_folder) if file.endswith(".csv")]

# Process each CSV file in the folder
for csv_file in csv_files:
    input_file_path = os.path.join(input_folder, csv_file)
    output_file_path = os.path.join(output_folder, f"{os.path.splitext(csv_file)[0]}_cleaned.csv")

    # Open the input CSV file for reading and the output CSV file for writing
    with codecs.open(input_file_path, 'r', encoding='utf-8', errors='ignore') as input_file, \
            open(output_file_path, 'w', newline='', encoding='utf-8') as output_file:
        
        # Initialize CSV reader and writer
        csv_reader = csv.reader(input_file)
        csv_writer = csv.writer(output_file)

        # Process each row in the input CSV file
        for row in csv_reader:
            # Remove non-UTF-8 encoded characters from each field in the row
            cleaned_row = [remove_non_utf8(field) for field in row]
            # Write the cleaned row to the output CSV file
            csv_writer.writerow(cleaned_row)

    print(f"Cleaning completed for {csv_file}. Cleaned data saved to {os.path.basename(output_file_path)}.")


In [None]:

# Specify the path to the folder containing the files
# folder_path = f".//datasets//Sport2000//to-be-ingested//final//"
folder_path = f".//datasets//ingested//Lifebear//to-be-ingested//final//"

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        # Construct the full path to the file
        file_path = os.path.join(folder_path, filename)

        # Read lines, remove specific characters, and update the original file
        with open(file_path, 'r+', encoding='latin-1') as file:
            lines = file.readlines()

            # Remove specific characters from each line
            cleaned_lines = [line.replace('*', '').replace('------', '').replace('?', '').replace('~', '').replace('^', '').replace('$', '').replace('"', '').replace("'", '').replace('|', '').replace('!', '').replace('#', '').replace('(', '').replace(')', '').replace('\\', '').replace('/@', '@').replace('zz', '').replace('n/a', '').replace('`', '').replace('&', '').replace('*', '').replace('+', '')              # Add more characters to remove
                             for line in lines]

            # Move the file pointer to the beginning and truncate the file
            file.seek(0)
            file.truncate()

            # Write the cleaned lines back to the file
            file.writelines(cleaned_lines)

        print(f"Specific characters removed from the file: {filename}")

print("Cleaning operation completed for all files in the folder.")


In [None]:
file_path = f".//datasets//ingested//Lifebear//to-be-ingested//final//lifebear_004_structured_cleaned.csv"
output_file_name = f".//datasets//ingested//Lifebear//to-be-ingested//final//lifebear_wd_final_4.csv"
f"."
condo_dataset = pd.read_csv(file_path, low_memory=True)
condo_wd_dataset = condo_dataset.drop_duplicates(subset=["username", "salt", "email"])
condo_wd_dataset.to_csv(output_file_name, encoding='utf-8', index=False)

condo_dataset = pd.read_csv(output_file_name, low_memory=True)
# condo_dataset.head(5)
condo_dataset.info()

In [None]:

# Specify the path to the folder containing the files
folder_path = f".//datasets//ingested//Lifebear//to-be-ingested//final//v0//"

# Function to truncate a string to a maximum of 32 bytes
def truncate_field(field, max_bytes=64):
    byte_field = field.encode('utf-8')  # Encode the field in UTF-8
    if len(byte_field) > max_bytes:
        truncated_field = byte_field[:max_bytes].decode('utf-8', errors='ignore')  # Truncate and ignore partial characters
        return truncated_field
    return field

# Function to clean and truncate fields in a CSV row
def clean_and_truncate_row(row, columns_to_truncate, max_bytes=32):
    # Clean and truncate each field in the specified columns
    cleaned_row = [
        truncate_field(
            field.replace('*', '').replace('..', '').replace('------', '').replace('?', '')
                 .replace('~', '').replace('^', '').replace('$', '').replace('"', '')
                 .replace("'", '').replace('|', '').replace('!', '').replace('#', '')
                 .replace('(', '').replace(')', '').replace('\\', '').replace('/@', '@')
                 .replace('zz', '').replace('n/a', '').replace('`', '').replace('&', '')
                 .replace('*', '').replace('+', '').replace('null', '').replace('.0', ''),  # Additional characters to remove
            max_bytes if i in columns_to_truncate else None
        ) if i in columns_to_truncate else field
        for i, field in enumerate(row)
    ]
    return cleaned_row

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        # Construct the full path to the file
        file_path = os.path.join(folder_path, filename)

        # Read and process the CSV file using UTF-8 encoding
        with open(file_path, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            lines = list(reader)

            # Specify which columns to truncate (for example: 0=firstname, 1=lastname, etc.)
            columns_to_truncate = [0,1,5]  # Modify this list to target the right columns
            
            # Clean and truncate each line
            cleaned_lines = [clean_and_truncate_row(line, columns_to_truncate) for line in lines]

        # Write the cleaned and truncated lines back to the file using UTF-8 encoding
        with open(file_path, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(cleaned_lines)

        print(f"Cleaning and truncation completed for the file: {filename}")

print("Operation completed for all files in the folder.")


In [None]:
def validate_dob(name_series):
    name_errors = []
    #    name_pattern = re.compile(r'^[a-zA-Z&|#(). ]+$')
    # Pattern to check if the name contains only letters
    name_pattern = re.compile(r'^[0-9&|#()-/. ]+$')

    for i, name in enumerate(name_series):
        if not re.match(name_pattern, str(name)):
            name_errors.append(f"Invalid characters in name in row {i + 1}: {name}")

    return name_errors

In [None]:
def validate_usernames(name_series):
    name_errors = []
    #    name_pattern = re.compile(r'^[a-zA-Z&|#(). ]+$')
    # Pattern to check if the name contains only letters
    name_pattern = re.compile(r'^[a-zA-Z0-9&|#(). ]+$')

    for i, name in enumerate(name_series):
        if not re.match(name_pattern, str(name)):
            name_errors.append(f"Invalid characters in name in row {i + 1}: {name}")

    return name_errors

In [None]:
def validate_names(name_series):
    name_errors = []
    #    name_pattern = re.compile(r'^[a-zA-Z&|#(). ]+$')
    # Pattern to check if the name contains only letters
    name_pattern = re.compile(r'^[a-zA-Z]+$')

    for i, name in enumerate(name_series):
        if not re.match(name_pattern, str(name)):
            name_errors.append(f"Invalid characters in name in row {i + 1}: {name}")

    return name_errors

In [None]:
def validate_emails(email_series):
    email_errors = []

   # Improved email pattern
    email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

    for i, email in enumerate(email_series):
       # Check if the email is blank
        if pd.isnull(email) or str(email).strip() == '':
            continue  # Skip validation for blank emails

        # Check if the email matches the pattern
        if not re.match(email_pattern, str(email)):
            email_errors.append(f"Invalid email address in row {i + 1}: {email}")

    return email_errors

In [None]:
def generate_report(validation_errors, total_rows):
    if validation_errors:
        # Plot a bar chart of the number of errors per column
        columns = list(validation_errors.keys())
        error_count = [len(error) for error in validation_errors.values()]

        # Calculate the percentage of errors
        error_percentage = [(count / total_rows) * 100 for count in error_count]
         # Manually set the size of the figure
        plt.figure(figsize=(10, 6))

        plt.bar(columns, error_count)
        plt.xlabel('Columns')
        plt.ylabel('Error Count')
        plt.title('Lifebear Dataset - Record Validation Testing')
        
        # Add error count labels on the bars
        # for i, count in enumerate(error_count): 
         # Add error percentage labels on the bars
        for i, percentage in enumerate(error_percentage):
            # plt.text(i, count + 0.1, str(count), ha='center', va='bottom')
            plt.text(i, percentage + 0.1, f"{percentage:.2f}%", ha='center', va='bottom')

        # Automatically adjust layout to prevent overlapping
       # plt.tight_layout()

        plt.show()
    else:
        print("Data type validation successful!")


In [None]:
file_path = f".//datasets//ingested//Lifebear//to-be-ingested//final//v0//lifebear_wd_final_3.csv"
df = pd.read_csv(file_path, low_memory=False)
pd.options.display.min_rows = 10

# dob_errors = validate_dob(df["dob"])
email_errors = validate_emails(df["email"])
username_errors = validate_usernames(df["username"])

# Combine all errors into one dictionary
all_errors = {
    'Email Errors': email_errors,
    'Username Errors': username_errors
}

# Using the shape attribute
num_rows, num_columns = df.shape
# total_errors = len(email_errors) + len(state_errors) + len(zip_errors) + len(country_errors) + len(city_errors)
total_errors = len(email_errors) + len(username_errors)
generate_report(all_errors, num_rows)

print(f'Verification Testing Complete. \n\n Total {num_rows} Rows Tested, {total_errors} Invalid Records FOUND!')
# Last two Email errors
print(f'Email Errors {email_errors[-10:]}')
print()
# Last two DOB errors

print(f'Firstname Code Errors {username_errors[-12:]}')
print()

## Uploading S3 records to RedShift

In [None]:
load_dotenv()

conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_STAGE_HOST'),
    port=os.getenv('REDSHIFT_STAGE_PORT'),
    database=os.getenv('REDSHIFT_STAGE_DB'),
    user=os.getenv('REDSHIFT_STAGE_USER'),
    password=os.getenv('REDSHIFT_STAGE_PASSWORD')
)

# username, email, password, salt, dob, gender
cursor = conn.cursor()
create_table_stmt = f"""
CREATE TABLE "dev"."lifebear_user_data"(
"username" character varying(4096) NULL, 
"email" character varying(4096) NULL, 
"password" character varying(4096) NULL, 
"salt" character varying(4096) NULL, 
"dob" character varying(4096) NULL, 
"gender" character varying(4096) NULL
)ENCODE AUTO;
"""

cursor.execute(create_table_stmt)
conn.commit()

info_table_name = "leaks.prod.info"
leak_info = "In early 2019, the Japanese schedule app Lifebear appeared for sale on a dark web marketplace amongst a raft of other hacked websites. The breach exposed almost 3.7M unique email addresses, usernames and passwords stored as salted MD5 hashes."
leak_source = "lifebear.com"
leak_download = "https://pixeldrain.com/u/RDKXZu9y"
leak_date = "2019-02-28"
ingestion_date = "2024-10-04"

info_insert_stmt = f'''
INSERT INTO {info_table_name}
("information", "source", "foundwhere", "leakdate", "adddate")
VALUES ('{leak_info}', '{leak_source}', '{leak_download}', '{leak_date}', '{ingestion_date}');
'''

cursor.execute(info_insert_stmt)
conn.commit()



info_v2_query_stmt = 'select * from leaks.prod.info order by id desc LIMIT 10'
cursor.execute(info_v2_query_stmt)
conn.commit()

# cursor.execute(diag_query_str)

# # # cursor.fetchone()
# cursor.fetchall()
for rec in cursor:
    print(rec)

cursor.close()


## Uploading S3 records to RedShift

In [None]:

load_dotenv()

conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_STAGE_HOST'),
    port=os.getenv('REDSHIFT_STAGE_PORT'),
    database=os.getenv('REDSHIFT_STAGE_DB'),
    user=os.getenv('REDSHIFT_STAGE_USER'),
    password=os.getenv('REDSHIFT_STAGE_PASSWORD')
)

   
cursor = conn.cursor()
table_name = "leaks.dev.lifebear_user_data"
s3_resource = "s3://leaksraw/priorities_1_NOTDONE/Lifebear/to-be-ingested/final/v0/"

# username,email,password,salt,dob,gender
copy_stmt = f'''
COPY {table_name} ("username", "email", "password", "salt", "dob", "gender")
FROM '{s3_resource}' IAM_ROLE 'arn:aws:iam::882736527955:role/service-role/AmazonRedshift-CommandsAccessRole-20230606T095354' 
DELIMITER ','
REMOVEQUOTES
ESCAPE
IGNOREHEADER 1
REGION AS 'us-east-1';
'''

cursor.execute(copy_stmt)
conn.commit()

query_str = 'SELECT count(*) FROM leaks.dev.lifebear_user_data'
cursor.execute(query_str)
conn.commit()


for rec in cursor:
    print(rec)
# 3827910
cursor.close()

# Confirmation Query

In [None]:
load_dotenv()

conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_STAGE_HOST'),
    port=os.getenv('REDSHIFT_STAGE_PORT'),
    database=os.getenv('REDSHIFT_STAGE_DB'),
    user=os.getenv('REDSHIFT_STAGE_USER'),
    password=os.getenv('REDSHIFT_STAGE_PASSWORD')
)

cursor = conn.cursor()
# query_str = 'SELECT count(*) FROM leaks.fling_user_data'
query_str = 'SELECT * FROM dev.lifebear_user_data LIMIT 20'
# query_str = 'SELECT count(*) FROM dev.s2000_user_data'

cursor.execute(query_str)
conn.commit()

for rec in cursor:
    print(rec)

cursor.close()

# Final Ingestion to Leaks Table

In [None]:
load_dotenv()

conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_STAGE_HOST'),
    port=os.getenv('REDSHIFT_STAGE_PORT'),
    database=os.getenv('REDSHIFT_STAGE_DB'),
    user=os.getenv('REDSHIFT_STAGE_USER'),
    password=os.getenv('REDSHIFT_STAGE_PASSWORD')
)
# name,email,dob,mobile,national_id
# LEAKS ==> "username", "firstname", "lastname", "email", "password", "ip", "address" "company_name", "phone",  "dob", "web_domain"
# Lifebear ==> username,email,password,salt,dob,gender

cursor = conn.cursor()

leaks_table_name = "leaks.prod.leaks"
ingested_table_name = "leaks.dev.lifebear_user_data"

leaks_insert_stmt = f'''
INSERT INTO {leaks_table_name}
("username", "email", "password", "dob", "info_id") 
SELECT "username", "email","password", "dob", 4519 AS "info_id"
FROM {ingested_table_name}
'''
cursor.execute(leaks_insert_stmt)
conn.commit()

leaks_count_stmt = 'select count(*) from leaks.prod.leaks'
cursor.execute(leaks_count_stmt)
count = cursor.fetchone()[0]


print(f'{count} Records successfully ingested!')

cursor.close()
conn.close()


In [None]:
# load_dotenv()

conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_STAGE_HOST'),
    port=os.getenv('REDSHIFT_STAGE_PORT'),
    database=os.getenv('REDSHIFT_STAGE_DB'),
    user=os.getenv('REDSHIFT_STAGE_USER'),
    password=os.getenv('REDSHIFT_STAGE_PASSWORD')
)

cursor = conn.cursor()
query_str = 'SELECT * FROM leaks.prod.leaks where info_id = 4519 LIMIT 10'
# query_str = 'SELECT count(*) FROM leaks.prod.leaks where info_id = 4054'
cursor.execute(query_str)
conn.commit()


for rec in cursor:
    print(rec)

cursor.close()