Import neccessary python libraries

In [164]:
import pandas as pd
from sqlalchemy import create_engine
import configparser
import io
import boto3
import psycopg2
import openpyxl

Manual collection of configs

In [165]:
config = configparser.ConfigParser()

# read the configuration file
config.read('multi_config.ini')

# get all the connections
config.sections()

['postgresql', 'aws_s3', 'csv', 'stmplib']

In [None]:
### Manual collection of credentials
'''
Authenticate the Postgres and S3 database by getting the credentials from the config file
'''
# Postgres SQL Credentials 
database = config.get('postgresql', 'database')
user = config.get('postgresql', 'user')
password = config.get('postgresql', 'password')
host = config.get('postgresql', 'host')
port = config.get('postgresql', 'port')

# AWS Credentials
service_name = config.get('aws_s3', 'service_name')
region_name = config.get('aws_s3', 'region_name')
aws_access_key_id = config.get('aws_s3', 'aws_access_key_id')
aws_secret_access_key = config.get('aws_s3', 'aws_secret_access_key')
s3_bucket = config.get('aws_s3', 's3_bucket')

# Local Credentials
local_file_path = config.get('csv', 'source')

# check creditials
print("Authentication successful \n")
print(f'The database is "{database}" and the service_name is "{service_name}" and local path is "{local_file_path}"')

Automated collection of config

In [190]:
##creation of function for credentials for authentication file
def get_credentials(sections, credentials):

    config = configparser.ConfigParser()

    config.read('multi_config.ini')

    collect_config = {}

    for i, section in enumerate(sections):
        section_credentials = []
        for credential in credentials[i]:
            section_credentials.append(config.get(section, credential))
        collect_config[f"{section}"] = section_credentials
    
    return collect_config

In [198]:
if __name__ == "__main__": 
    sections = ["postgresql", "aws_s3", "csv", "stmp"]
    credential_names = [
        ["database", "user", "password", "host", "port"], 
        ["service_name", "region_name", "aws_access_key_id", "aws_secret_access_key", "s3_bucket"],
        ["source", "target"],
        ["smtp_port", "smtp_server", "smtp_sender_email", "smtp_receiver_email", "smtp_password"]
    ]
credentials = get_credentials(sections, credential_names)

In [200]:
# ensuring that the credentials were successful 
print("Getting credentials...")
credentials = get_credentials(sections, credential_names)
print("Collection of credentials were successful!")
database, user, password, host, port = credentials[sections[0]]
print("Successful Warehouse (Postgres SQL) credentials collection")
service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket = credentials[sections[1]]
print("Sucessful Datalake (AWS S3) credentials collection")
print(database, user, password, host, port, service_name, region_name, aws_access_key_id, aws_secret_access_key)
source, target = credentials[sections[2]]
print("Successful Local (CSV) credentials collection")
smtp_port, smtp_server, smtp_sender_email, smtp_receiver_email, smtp_password = credentials[sections[3]]
print("Sucessful Email (smtp) credentials collection")
print(database, user, password, host, port, service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket, source, target, smtp_port, smtp_server, smtp_sender_email, smtp_receiver_email, smtp_password)

Getting credentials...
Collection of credentials were successful!
Successful Warehouse (Postgres SQL) credentials collection
Sucessful Datalake (AWS S3) credentials collection
film_data postgres Slough20 127.0.0.1 5432 s3 eu-west-1 AKIA256HM6GCFDDX7EQE 1tKid1lgJSi+NPryKqS/dOHVeDCQZQ00n14y8520
Successful Local (CSV) credentials collection
Sucessful Email (smtp) credentials collection
film_data postgres Slough20 127.0.0.1 5432 s3 eu-west-1 AKIA256HM6GCFDDX7EQE 1tKid1lgJSi+NPryKqS/dOHVeDCQZQ00n14y8520 jreaydatalake data_files/IMDB-Movie-Data-Local.csv report output/report.xlsx 587 smtp.gmail.com jreay.data.eng@gmail.com jessicaareay@hotmail.com omnkaiawatqvatua


In [None]:
local_credentials = get_credentials(sections, credential_names)

Manual Extract 

In [169]:
# load local csv into dataframe
extracted_local_df = pd.read_csv(local_file_path) 
#load csv to dataframe to be loaded into postgres sql
to_load_warehouse_df = pd.read_csv('data_files/IMDB-Movie-Data-Postgres.csv') 

In [170]:
## load to postgres
# determine table name
table_name = 'IMDB_movie_data'

# Create an engine instance
alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600)
# Connect to PostgreSQL server
dbConnection = alchemyEngine.connect()
# Upload data to sql database
to_load_warehouse_df.to_sql(table_name, dbConnection, if_exists='fail')
print(f'PostgreSQL Table, "{table_name}", has been created successfully.')

dbConnection.close()

ValueError: Table 'IMDB_movie_data' already exists.

In [171]:
## extract from s3
s3_resource = boto3.resource(
    service_name = service_name,
    region_name = region_name, 
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

string_io = io.BytesIO()
s3_resource.Object(s3_bucket, "IMDB-Movie-Data-S3.csv").download_fileobj(string_io)
s3_contents = string_io.getvalue()

extracted_datalake_df = pd.read_csv(io.BytesIO(s3_contents))
extracted_datalake_df

Unnamed: 0,Title,Genre,Description,Director
0,Guardians of the Galaxy,"Action,Adventure,Sci-Fi",A group of intergalactic criminals are forced ...,James Gunn
1,Prometheus,"Adventure,Mystery,Sci-Fi","Following clues to the origin of mankind, a te...",Ridley Scott
2,Split,"Horror,Thriller",Three girls are kidnapped by a man with a diag...,M. Night Shyamalan
3,Sing,"Animation,Comedy,Family","In a city of humanoid animals, a hustling thea...",Christophe Lourdelet
4,Suicide Squad,"Action,Adventure,Fantasy",A secret government agency recruits some of th...,David Ayer
...,...,...,...,...
995,Secret in Their Eyes,"Crime,Drama,Mystery","A tight-knit team of rising investigators, alo...",Billy Ray
996,Hostel: Part II,Horror,Three American college students studying abroa...,Eli Roth
997,Step Up 2: The Streets,"Drama,Music,Romance",Romantic sparks occur between two dance studen...,Jon M. Chu
998,Search Party,"Adventure,Comedy",A pair of friends embark on a mission to reuni...,Scot Armstrong


In [172]:
## extract from postgres sql
# Create an engine instance
alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600);

# Connect to PostgreSQL server
dbConnection = alchemyEngine.connect();

# Read data from PostgreSQL database table and load into a DataFrame instance
sql = f"select * from \"{table_name}\""
extracted_warehouse_df = pd.read_sql(sql, dbConnection);
pd.set_option('display.expand_frame_repr', False);

if dbConnection:
    dbConnection.close()
print("PostgreSQL connection is closed")

PostgreSQL connection is closed


Automation of extract

In [238]:
sections = ["postgresql", "aws_s3", "csv"]
credential_names = [
    ["database", "user", "password", "host", "port"], 
    ["service_name", "region_name", "aws_access_key_id", "aws_secret_access_key", "s3_bucket"],
    ["source", "target"]
]

credentials = get_credentials(sections, credential_names)

warehouse_credentials = credentials[sections[0]]
datalake_credentials = credentials[sections[1]]
local_credentials = credentials[sections[2]]

In [239]:
def warehouse_extraction(table, database, user, password, host):
        try:
            # Create an engine instance
            alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600)

            # Connect to PostgreSQL server
            dbConnection = alchemyEngine.connect()

            # Read data from PostgreSQL database table and load into a DataFrame instance
            sql = f"select * from \"{table}\""
            warehouse_df = pd.read_sql(sql, dbConnection)

        finally:
            dbConnection.close()
        
        return warehouse_df

In [240]:
# Postgres
if None not in warehouse_credentials and "" not in warehouse_credentials:
    database, user, password, host, port = warehouse_credentials
    warehouse_df = warehouse_extraction("IMDB_movie_data", database, user, password, host)
else:
    raise Exception("Extraction failed: error with DB credentials")
    

In [249]:
def datalake_extraction(file_name, service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket):
    
    s3_resource = boto3.resource(
        service_name=service_name,
        region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

    string_io = io.BytesIO()
    s3_resource.Object(s3_bucket, file_name).download_fileobj(string_io)
    s3_contents = string_io.getvalue()
    
    datalake_df = pd.read_csv(io.BytesIO(s3_contents))
    return datalake_df


In [250]:
# AWS S3
if None not in datalake_credentials and "" not in datalake_credentials: 
    service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket = datalake_credentials
    datalake_df = datalake_extraction("IMDB-Movie-Data-S3.csv", service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket)
else:
    raise Exception("Extraction failed: error with DB credentials")

In [243]:
def local_extraction(filepath):
    local_df = pd.read_csv(filepath)
    return local_df
    

In [244]:
local_df = local_extraction(local_credentials[0])

In [310]:

def extract():

    sections = ["postgresql", "aws_s3", "csv"]
    credential_names = [
        ["database", "user", "password", "host", "port"], 
        ["service_name", "region_name", "aws_access_key_id", "aws_secret_access_key", "s3_bucket"],
        ["source", "target"]
    ]

    credentials = get_credentials(sections, credential_names)

    warehouse_credentials = credentials[sections[0]]
    datalake_credentials = credentials[sections[1]]
    local_credentials = credentials[sections[2]]

   # Postgres
    if None not in warehouse_credentials and "" not in warehouse_credentials:
        database, user, password, host, port = warehouse_credentials
        warehouse_df = warehouse_extraction("IMDB_movie_data", database, user, password, host)
    else:
        raise Exception("Extraction failed: error with DB credentials")
    
    # AWS S3
    if None not in datalake_credentials and "" not in datalake_credentials: 
        service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket = datalake_credentials
        datalake_df = datalake_extraction("IMDB-Movie-Data-S3.csv", service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket)
    else:
        raise Exception("Extraction failed: error with DB credentials")
    
    local_df = local_extraction(local_credentials[0])

    return [warehouse_df, datalake_df, local_df]

def warehouse_extraction(table, database, user, password, host):
        try:
            # Create an engine instance
            alchemyEngine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}/{database}', pool_recycle=3600)

            # Connect to PostgreSQL server
            dbConnection = alchemyEngine.connect()

            # Read data from PostgreSQL database table and load into a DataFrame instance
            sql = f"select * from \"{table}\""
            warehouse_df = pd.read_sql(sql, dbConnection)

        finally:
            dbConnection.close()
        
        return warehouse_df

def datalake_extraction(file_name, service_name, region_name, aws_access_key_id, aws_secret_access_key, s3_bucket):
    
    s3_resource = boto3.resource(
        service_name=service_name,
        region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

    string_io = io.BytesIO()
    s3_resource.Object(s3_bucket, file_name).download_fileobj(string_io)
    s3_contents = string_io.getvalue()
    
    datalake_df = pd.read_csv(io.BytesIO(s3_contents))
    
    return datalake_df

def local_extraction(filepath):
    local_df = pd.read_csv(filepath)
    return local_df
    

Manual Transform

In [173]:
# need to set index in order to join 
extracted_warehouse_df = extracted_warehouse_df.set_index('index')

In [188]:
# use merge function to join all three dataframes on index 
movie_data_df = pd.merge(pd.merge(extracted_datalake_df, extracted_local_df, left_index= True, right_index= True), extracted_warehouse_df, left_index= True, right_index= True)

In [175]:
# check joined dataframe- can see all three titles
movie_data_df.head()

Unnamed: 0,Title_x,Genre,Description,Director,Title_y,Rating,Votes,Revenue_Millions,Metascore,Title,Actors,Year,Runtime_Minutes
0,Guardians of the Galaxy,"Action,Adventure,Sci-Fi",A group of intergalactic criminals are forced ...,James Gunn,Guardians of the Galaxy,8.1,757074,333.13,76.0,Guardians of the Galaxy,"Chris Pratt, Vin Diesel, Bradley Cooper, Zoe S...",2014,121
1,Prometheus,"Adventure,Mystery,Sci-Fi","Following clues to the origin of mankind, a te...",Ridley Scott,Prometheus,7.0,485820,126.46,65.0,Prometheus,"Noomi Rapace, Logan Marshall-Green, Michael Fa...",2012,124
2,Split,"Horror,Thriller",Three girls are kidnapped by a man with a diag...,M. Night Shyamalan,Split,7.3,157606,138.12,62.0,Split,"James McAvoy, Anya Taylor-Joy, Haley Lu Richar...",2016,117
3,Sing,"Animation,Comedy,Family","In a city of humanoid animals, a hustling thea...",Christophe Lourdelet,Sing,7.2,60545,270.32,59.0,Sing,"Matthew McConaughey,Reese Witherspoon, Seth Ma...",2016,108
4,Suicide Squad,"Action,Adventure,Fantasy",A secret government agency recruits some of th...,David Ayer,Suicide Squad,6.2,393727,325.02,40.0,Suicide Squad,"Will Smith, Jared Leto, Margot Robbie, Viola D...",2016,123


In [176]:
#compare if the join was successful if the titles match 
if movie_data_df['Title_x'].equals(movie_data_df['Title_y']) & movie_data_df['Title_x'].equals(movie_data_df['Title']) & movie_data_df['Title_y'].equals(movie_data_df['Title']):
    print('The join was successful')
else: 
    print('The join was unsuccessful')

The join was successful


In [177]:
# check and remove unnecessary title columns 
if movie_data_df['Title_x'].equals(movie_data_df['Title_y']) & movie_data_df['Title_x'].equals(movie_data_df['Title']) & movie_data_df['Title_y'].equals(movie_data_df['Title']):
    movie_data_df = movie_data_df.drop(['Title_y', 'Title_x'], axis= 1)
else: 
    print('The join was unsuccessful')

In [178]:
#check new df
movie_data_df.head()

Unnamed: 0,Genre,Description,Director,Rating,Votes,Revenue_Millions,Metascore,Title,Actors,Year,Runtime_Minutes
0,"Action,Adventure,Sci-Fi",A group of intergalactic criminals are forced ...,James Gunn,8.1,757074,333.13,76.0,Guardians of the Galaxy,"Chris Pratt, Vin Diesel, Bradley Cooper, Zoe S...",2014,121
1,"Adventure,Mystery,Sci-Fi","Following clues to the origin of mankind, a te...",Ridley Scott,7.0,485820,126.46,65.0,Prometheus,"Noomi Rapace, Logan Marshall-Green, Michael Fa...",2012,124
2,"Horror,Thriller",Three girls are kidnapped by a man with a diag...,M. Night Shyamalan,7.3,157606,138.12,62.0,Split,"James McAvoy, Anya Taylor-Joy, Haley Lu Richar...",2016,117
3,"Animation,Comedy,Family","In a city of humanoid animals, a hustling thea...",Christophe Lourdelet,7.2,60545,270.32,59.0,Sing,"Matthew McConaughey,Reese Witherspoon, Seth Ma...",2016,108
4,"Action,Adventure,Fantasy",A secret government agency recruits some of th...,David Ayer,6.2,393727,325.02,40.0,Suicide Squad,"Will Smith, Jared Leto, Margot Robbie, Viola D...",2016,123


In [180]:
check_df = pd.read_csv('data_files/IMDB-Movie-Data.csv')

check_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 12 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Rank                1000 non-null   int64  
 1   Title               1000 non-null   object 
 2   Genre               1000 non-null   object 
 3   Description         1000 non-null   object 
 4   Director            1000 non-null   object 
 5   Actors              1000 non-null   object 
 6   Year                1000 non-null   int64  
 7   Runtime (Minutes)   1000 non-null   int64  
 8   Rating              1000 non-null   float64
 9   Votes               1000 non-null   int64  
 10  Revenue (Millions)  872 non-null    float64
 11  Metascore           936 non-null    float64
dtypes: float64(3), int64(4), object(5)
memory usage: 93.9+ KB


In [181]:
report_df = movie_data_df.groupby('Genre').agg({'Revenue_Millions': 'sum', 'Votes': 'sum', 'Genre': 'count'}).rename(columns = {'Genre': 'Movie_count'}).sort_values(by = 'Revenue_Millions', ascending = False)

report_df.insert(0, 'Rank', range(1, 1 + len(report_df)))

report_df

Unnamed: 0_level_0,Rank,Revenue_Millions,Votes,Movie_count
Genre,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
"Action,Adventure,Sci-Fi",1,10461.51,18582076,50
"Animation,Adventure,Comedy",2,5754.75,5913065,27
"Action,Adventure,Fantasy",3,5248.29,7816851,27
"Adventure,Family,Fantasy",4,2201.47,2640649,14
Comedy,5,1941.81,3685529,32
...,...,...,...,...
"Animation,Drama,Romance",203,0.00,2421,1
"Comedy,Sci-Fi",204,0.00,26587,1
"Comedy,Western",205,0.00,31149,1
"Drama,Family",206,0.00,177602,1


In [182]:
writer = pd.ExcelWriter(config.get('csv', 'target'))
report_df.to_excel(writer, sheet_name= "Genre Ranking")
movie_data_df.to_excel(writer, sheet_name="Raw Data")
writer.close()

In [183]:

## testing connection to send email
import smtplib, ssl

smtp_server = 'smtp.gmail.com'
port = 465

sender = 'jreay.data.eng@gmail.com'
password_s = 'omnkaiawatqvatua'

context = ssl.create_default_context()

with smtplib.SMTP_SSL(smtp_server, port, context= context) as server:
    server.login(sender, password_s)
    print('It worked!')

It worked!


In [15]:
## send tester email without attachment 

# import smtplib, ssl
# smtp_port = config.get('stmp', 'smtp_port')
# smtp_server = config.get('stmp', 'smtp_server')
# sender_email = config.get('stmp', 'smtp_sender_email')
# receiver_email = config.get('stmp', 'smtp_receiver_email')
# smtp_password = config.get('stmp', 'smtp_password')
# message = """\
# Subject: Hi there 3
# Im sending an email through python code."""
# context = ssl.create_default_context()
# with smtplib.SMTP(smtp_server, smtp_port) as server:
#     server.ehlo() 
#     server.starttls(context=context)
#     server.ehlo() 
#     server.login(sender_email, smtp_password)
#     server.sendmail(sender_email, receiver_email, message)

In [184]:
# Import libraries 
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os.path

import datetime
today_date = datetime.datetime.now().strftime('%d %b %Y')


In [185]:
# add configs for smtp:
smtp_port = config.get('stmp', 'smtp_port')
smtp_server = config.get('stmp', 'smtp_server')
sender_email = config.get('stmp', 'smtp_sender_email')
receiver_email = config.get('stmp', 'smtp_receiver_email')
smtp_password = config.get('stmp', 'smtp_password')

In [186]:
from tabulate import tabulate
top_5_data = [[report_df.Rank[0], report_df.index[0], report_df.Revenue_Millions[0]],
[report_df.Rank[1], report_df.index[1], report_df.Revenue_Millions[1]],
[report_df.Rank[2], report_df.index[2], report_df.Revenue_Millions[2]],
[report_df.Rank[3], report_df.index[3], report_df.Revenue_Millions[3]],
[report_df.Rank[4], report_df.index[4], report_df.Revenue_Millions[4]]]
#print (tabulate(top_5_data, headers=["Rank", "Genre", "Revenue_Millions"]))

In [187]:
subject = f"Daily Profitable Genres Report for the {today_date}"
message = f'Hello, \n\n Please find the top 5 Ranked Genres for {today_date} as follows: \n\n {(tabulate(top_5_data, headers=["Rank", "Genre", "Revenue_Millions"]))} \n\n I have also attached a csv file with the rest of the data. \n\n Kind Regards.'

file_location =config.get('csv', 'target')

msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = receiver_email
msg['Subject'] = subject

msg.attach(MIMEText(message, 'plain'))

# Setup the attachment
filename = os.path.basename(file_location)
attachment = open(file_location, "rb")
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', "attachment; filename= %s" % filename)

# Attach the attachment to the MIMEMultipart object
msg.attach(part)

server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, smtp_password)
text = msg.as_string()
server.sendmail(sender_email, receiver_email, text)
server.quit()

(221,
 b'2.0.0 closing connection x24-20020a05600c189800b003b4727d199asm13404364wmp.15 - gsmtp')