### Connecting to MySQL and downloading the Songs Table to Python

### First task: Ensure you have downloaded and run the below sql file in MySQL
### 07_(WC SQL) raw_Python_MySQL_transaction_record_tasks.sql

In [1]:
#Install the mysql module
!pip install sqlalchemy
!pip install mysql-connector-python
!pip install mysqlclient
!pip install PyMySQL

#Install the Postgresql module
!pip install psycopg2

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
import mysql.connector as dbconnect
import mysql.connector as mysqlconnector
from sqlalchemy import create_engine
import pandas as pd

In [3]:
# Ensure the database name is: Py_SONGS_Db
# Ensure the table name is: raw_Py_Songs_tb

query = '''
SELECT *
FROM Py_SONGS_Db.raw_Py_Songs_tb;
'''

In [None]:
# Masking my Password
import getpass

# Prompt the user for a password
password = getpass.getpass("Enter your password: ")

In [None]:
db_setup = dbconnect.connect(host='localhost',
                            database='Py_SONGS_Db',
                            user='root',
                            passwd=password)

In [None]:
mysql_df = pd.read_sql(query,db_setup)
mysql_df

In [None]:
# Checking the content datatype of the sql_df
mysql_df.info()

### Perform the Necessary Data Cleaning and Transformation

In [None]:
mysql_df.shape

In [None]:
# Checking How many rows and columns in the table
print(f'The number of rows in the table: {mysql_df.shape[0]}')
print(f'The number of columns in the table: {mysql_df.shape[1]}')

In [None]:
# Checking the number of null/NaN values
mysql_df.isnull().sum()

In [None]:
# Extracting the column headers
column_list = list(mysql_df.columns) # you can use: mysql_df.columns.tolist()
print(f'The column headers: {column_list}')

In [None]:
# Checking the unique number of years
mysql_df['year_released'].unique()

In [None]:
# Checking how many records with year = 0

# Setting the condition
invalid_yr = mysql_df['year_released']==0

# Counting the year based on the condition
mysql_df['year_released'][invalid_yr].count()

In [None]:
# Extracting the first 5 records of invalid year
mysql_df[['song_id', 'title', 'artist', 'album', 'year_released']][invalid_yr].head()

In [None]:
# Deleting the records with invalid year value
mysql_df.drop(mysql_df[invalid_yr].index, inplace=True)

# Confirming the deletion
mysql_df['year_released'][invalid_yr].count()

In [None]:
# Checking for invalid value of tempos
print(f"Minimum value for tempos: {mysql_df['tempo'].min()}") 
print(f"Maximum value for tempos: {mysql_df['tempo'].max()}") 

In [None]:
# Checking how many records with tempo <= 0

# Setting the condition
invalid_tempo = mysql_df['tempo']<=0

# Counting the year based on the condition
mysql_df['tempo'][invalid_tempo].count()

In [None]:
# Extracting the first 5 records of invalid tempos
mysql_df[['song_id', 'title', 'artist', 'album', 'tempo']][invalid_tempo].head()

In [None]:
# Deleting the records with invalid year value
mysql_df.drop(mysql_df[invalid_tempo].index, inplace=True)

# Confirming the deletion
mysql_df['tempo'][invalid_tempo].count()

In [None]:
# Checking for invalid value of tempos
print(f"Minimum value for loudness: {mysql_df['loudness'].min()}") 
print(f"Maximum value for loudness: {mysql_df['loudness'].max()}") 

In [None]:
# Checking how many records with tempo >= 0

# Setting the condition
invalid_loudness = mysql_df['loudness']>=0

# Counting the year based on the condition
mysql_df['loudness'][invalid_loudness].count()

In [None]:
# Extracting the first 5 records of invalid tempos
mysql_df[['song_id', 'title', 'artist', 'album', 'loudness']][invalid_loudness].head()

In [None]:
# Deleting the records with invalid year value
mysql_df.drop(mysql_df[invalid_loudness].index, inplace=True)

# Confirming the deletion
mysql_df['loudness'][invalid_loudness].count()

In [None]:
# Checking for accurate data in the dataframe
mysql_df.describe()

In [None]:
# checking for the datatypes of each columns
mysql_df.info()

In [None]:
# Extracting the column headers
column_list = list(mysql_df.columns) # you can use: mysql_df.column.tolist()
print(f'The column headers: {column_list}')

In [None]:
# Applying the right datatypes to each columns
string_list = ['title', 'artist', 'album']
for header in string_list:
    mysql_df[header] =  mysql_df[header].astype('string')
    
integer_list = ['song_id', 'year_released']
for header in integer_list:
    mysql_df[header] =  mysql_df[header].astype('int16')

float_list = ['duration', 'tempo','loudness']
for header in float_list:
    mysql_df[header] =  mysql_df[header].astype('float16')
    
mysql_df.info()

In [None]:
# Checking the trend of loudness over the years
#create a pivot table from dataframe by modifying the below code but using:
# index= 'year_released', values = loudness, aggfunc='mean'

pd.pivot_table(mysql_df,index='year_released',values='loudness',  aggfunc='mean') 

In [None]:
# Using matplotlib, draw a line graph of the year against the average loudness
import matplotlib.pyplot as plt
pivot_table = pd.pivot_table(mysql_df,index='year_released',values='loudness',  aggfunc='mean') 
plt.plot(pivot_table['loudness'],color='green',marker='*',linestyle='-')
plt.xlabel('Years')
plt.ylabel('Loudness')
plt.title('Songs average loudness over the years')

In [None]:
# Checking the number of clean records in our dataframe
mysql_df.shape

### Exporting your Cleaned Dataframe into MySQL

In [None]:
table_name = 'clean_py_songs_tb'
hostname="localhost"
dbname="py_songs_db"
username="root"
# you can use "mysql+pymysql" or "mysql+mysqldb" instead "mysql+mysqlconnector"
engine = create_engine(f"mysql+mysqlconnector://{username}:{password}@{hostname}/{dbname}",
                      pool_recycle=1, pool_timeout=57600).connect()
mysql_df.to_sql(name=table_name, 
        con=engine, 
        if_exists='replace', #there is append, fail
        index=False)

In [None]:
# Confirming the actual records exported
mysql_df.shape[0]

## Data Migration part: Export the mysql_df into Postgresql database 

In [None]:
# Export the mysql_df to PostgreSQL Database (Database name: mysql_pysong_db)
# Please ensure the database(mysql_pysong_db) is already created in Postgresql

table_name = 'clean_py_songs_tb'

In [None]:
import psycopg2 as pgconnect
import pandas as pd

In [None]:
# Masking my Password
import getpass


# Prompt the user for a password
passwd = getpass.getpass("Enter your password: ")

In [None]:
pg_setup = pgconnect.connect(host='localhost',
                            #database='Py_largeTransactionDB',
                            database='mysql_pysong_db',
                            user='postgres',
                            #port=5432
                            password=passwd)
# uses default port:5432

In [None]:
# Exporting the clean dataframe to PostgreSQL
import time

row_total = len(mysql_df)
start_time = time.time()
table_name = 'clean_py_songs_tb'
hostname = "localhost"
dbname = "mysql_pysong_db"
username = "postgres"
port = 5432
engine = create_engine(f"postgresql+psycopg2://{username}:{passwd}@{hostname}:{port}/{dbname}")
mysql_df.to_sql(table_name, 
                 con=engine, 
                 #if_exists="replace", 
                 if_exists='replace',
                 index=False)
stop_time = time.time()
print(f'The query with {row_total} rows took {round(stop_time-start_time)}secs')

In [None]:
# Closing the database connection
db_setup.close()
print('Database connection close successfully')

### Connecting to PostgreSQL and downloading the Transaction Table to Python

### First task: Ensure you have downloaded and run the below sql file in Postgresql
### 08_(WC SQL) raw_Python_PostgreSQL_transaction_record_tasks.sql

In [None]:
import psycopg2 as pgconnect
import pandas as pd

In [None]:
# Masking my Password
import getpass


# Prompt the user for a password
passwd = getpass.getpass("Enter your password: ")

In [None]:
pg_setup = pgconnect.connect(host='localhost',
                            #database='Py_largeTransactionDB',
                            database='mysql_pysong_db',
                            user='postgres',
                            #port=5432
                            password=passwd)
# uses default port:5432

In [None]:
# Ensure the database name is: Py_largeTransactionDB
# Ensure the table name is: raw_Py_trans_records in your postgresql
query = '''
SELECT * 
FROM raw_Py_trans_records;
'''

In [None]:
pgsql_df = pd.read_sql(query,pg_setup)
pgsql_df

## Perform the Necessary Data Cleaning and Transformation

In [None]:
pgsql_df.shape

In [None]:
# Checking How many rows and columns in the table
print(f'The number of rows in the table: {pgsql_df.shape[0]}')
print(f'The number of columns in the table: {pgsql_df.shape[1]}')

In [None]:
# Checking the content datatype of the pgsql_df
pgsql_df.info()

In [None]:
# Extracting the column headers
column_list1 = list(pgsql_df.columns) # you can use: pgsql_df.columns.tolist()
print(f'The column headers: {column_list1}')

In [None]:
# Applying the right datatypes to each columns
string_list = ['user_id', 'event_time', 'order_id', 'product_id', 'category_id', 'category_code', 'brand']
for header in string_list:
    pgsql_df[header] =  pgsql_df[header].astype('string')

float_list = ['price']
for header in float_list:
    pgsql_df[header] =  pgsql_df[header].astype('float16')
    
pgsql_df.info()

In [None]:
# Checking the number of null/NaN values
pgsql_df.isnull().sum()

In [None]:
# Checking the first 5 null/NaN records
pgsql_df.isnull().head(5)

In [None]:
#Removing the rows containing the NaN values
pgsql_df = pgsql_df.dropna(axis=0)
pgsql_df.head(5)

In [None]:
pgsql_df.count()

In [None]:
# Checking the number of clean records in our dataframe
pgsql_df.shape

### Exporting your Cleaned Dataframe into PostgreSQL

In [None]:
# Exporting the clean dataframe by chunking
import time

row_total = len(pgsql_df)
start_time = time.time()
table_name = 'clean_Pytransact_records_tb'
hostname = "localhost"
dbname = "mysql_pysong_db"
username = "postgres"
port = 5432
engine = create_engine(f"postgresql+psycopg2://{username}:{passwd}@{hostname}:{port}/{dbname}")
pgsql_df.to_sql(table_name, 
                 con=engine, 
                 if_exists="replace",
                 index=False)
stop_time = time.time()
print(f'The query with {row_total} rows took {round(stop_time-start_time)}secs')

## Data Migration part: Export the pgsql_df into Mysql database 

In [None]:
# Export the pgsql_df to MySQL Database (Database name: pg_pytransact_record_db)
# Please ensure the database(pg_pytransact_record_db) is already created in mysql

table_name = 'clean_Pytransact_records_tb'
dbname="pg_pytransact_record_db"

In [None]:
import time

row_total = len(pgsql_df)
hostname="localhost"
username="root"
# you can use "mysql+pymysql" or "mysql+mysqldb" instead "mysql+mysqlconnector"
engine = create_engine(f"mysql+mysqldb://{username}:{password}@{hostname}/{dbname}",
                      pool_recycle=1, pool_timeout=57600)
pgsql_df.to_sql(name=table_name, 
        con=engine, 
        if_exists='append',
        chunksize = 100_000,
        index=False)

In [None]:
# Closing the database connection
pg_setup.close()
print('Database connection close successfully')

### Working with Oracle

In [None]:
# Installing Oracle Software
!pip install cx_Oracle

In [None]:
# Import oracle
import cx_Oracle

In [None]:
# Masking my Password
import getpass

# Prompt the user for a password
password = getpass.getpass("Enter your password: ")
conn = cx_Oracle.connect("system/"+password+"@localhost:1521/XE")
print("Connection created successfully")

In [None]:
# Connecting to SQL Server

In [None]:
# Install the the pyodbc module
!pip install pyodbc

In [None]:
# Import libraries
import pyodbc
import pandas as pd

In [None]:
pyodbc.drivers()

In [None]:
import time
cursor = pg_setup.cursor()
row_total = len(pgsql_df)

#Generating a Big Query

for i in pgsql_df.index:
    sql = """INSERT INTO 
            clean_Python_transaction_records(user_id, event_time, order_id, product_id,  
                                            category_id, category_code, brand, price) 
                                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""
    values = (str(pgsql_df['user_id'][i]),str(pgsql_df['event_time'][i]),str(pgsql_df['order_id'][i]), 
    str(pgsql_df['product_id'][i]),str(pgsql_df['category_id'][i]),str(pgsql_df['category_code'][i]), 
    str(pgsql_df['brand'][i]),str(pgsql_df['price'][i]))
# big_query = big_query.strip(',')+';' # replace trailing ',' with ';'

start_time = time.time()
cursor.execute(sql, values)
pg_setup.commit()
stop_time = time.time()
print(f'The query with {row_total} rows took {stop_time-start_time}')
      
# closing the cursor and database connection
cursor.close()
pg_setup.close()