# Transformation step :
* AWS MySQL connection Setup, Data Loading, SQL Transformation, and CSV Export Process to the S3 bucket


* Mounts Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


* Installing Required Python Packages: boto3, botocore, mysql-connector-python


In [2]:
!pip install boto3 botocore mysql-connector-python

Collecting boto3
  Downloading boto3-1.34.1-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore
  Downloading botocore-1.34.1-py3-none-any.whl (11.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.8/11.8 MB[0m [31m59.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mysql-connector-python
  Downloading mysql_connector_python-8.2.0-cp310-cp310-manylinux_2_17_x86_64.whl (31.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.6/31.6 MB[0m [31m32.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.10.0,>=0.9.0 (from boto3)
  Downloading s3transfer-0.9.0-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.0/82.0 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
Collecting protobu

* Installing Required Python Packages: boto3, botocore, mysql-connector-python


In [3]:
import boto3
import mysql.connector
from botocore.exceptions import NoCredentialsError
import pandas as pd
import io


* Declare important variable
* Note : Transformation.sql file is a group of sql queries that is used to cleaning and filtering data
* Transformation.sql file is attached with this code

In [1]:
#AWS Credential with region
aws_access_key = 'your aws_access_key'
aws_secret_key = 'your aws_secret_key'
aws_region = 'ap-south-1'

# Setup localpath
# Note : Transformation.sql file is a group of sql queries that is used to cleaning and filtering data
# Transformation.sql file is attached with this code 
local_file_path = '/content/drive/MyDrive/Colab Notebooks/Transform.sql'  # Replace with the path to your local file

# S3 bucket and objeck key parameter configuration:
# you can change value according to your need
s3_bucket_name = 'etlpro'
s3_object_key = 'IMDB_Movie_Ratings.xlsx'
s3_object_key_1 = 'Transform.sql'  # Replace with the desired S3 object key
s3_object_key_csv = 'Backup_imdb.csv' # Replace with the desired S3 object key

# Database connection parameter configuration:
host = 'ec2 instance public ip' #must be replace
user = 'etl'
password = 'sql user password'
database = 'imdb'

In [47]:
def connect_to_mysql(host, user, password, database):
    try:
        connection = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
        if connection.is_connected():
            print(f"Connected to MySQL database on host {host}")
            return connection
    except mysql.connector.Error as e:
        print(f"Error connecting to MySQL: {e}")
        return None


* Database Connection, S3 Connection, and MySQL Cursor Initialization

* Note : if your connection is not establish then setup this
* sudo nano /etc/mysql/mysql.conf.d/mysqld.cnf
* bind-address            = 0.0.0.0
* after that restart the mysql service

In [48]:
# Create a connection object to establish a connection to mysql database
connection = connect_to_mysql(host, user, password, database)

# Create a s3 object to Connect to S3
s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=aws_region)

# Create a cursor object to execute mysql database queries
cursor = connection.cursor()

Connected to MySQL database on host 13.201.85.179


* Set MySQL Connection to Autocommit Mode

In [32]:
connection.autocommit = True

* Use Database and Drop Table (If needed)


In [36]:
# Only use when it's needed 
cursor.execute("use imdb")
cursor.execute("drop table movie_data;")
print("drop table movie_data successfully")

drop table movie_data successfully


* Function to Create MySQL Table 'movie_data' in Database 'imdb'
* if you have to working on different data or database the must be change table schema

In [37]:
def create_table(connection):
    try:
        cursor.execute("use imdb")
        cursor.execute("""
            CREATE TABLE movie_data (
                movie_name VARCHAR(255),
                year_of_release INT,
                watch_time VARCHAR(30),
                rating DECIMAL(2, 1),
                metascore INT,
                votes INT,
                description TEXT
            )
        """)
        print("Table 'movie_data' created successfully.")

    except mysql.connector.Error as e:
        print(f"Error creating table: {e}")


* Run the create_table() Function

In [38]:
create_table(connection)

Table 'movie_data' created successfully.


* Function to Load Data from S3 to MySQL Database

In [39]:
def load_data_to_mysql(connection, bucket_name, aws_access_key, aws_secret_key, region_name, s3_object_key):
    try:
        # Download data from S3
        response = s3.get_object(Bucket=bucket_name, Key=s3_object_key)
        data = response['Body'].read()

        # Read the data into a DataFrame
        df = pd.read_excel(io.BytesIO(data))
        print(df)
        print(df.columns)

        # Iterate over rows and insert data into MySQL
        for index, row in df.iterrows():
            votes = row['Votes'].replace(',', '')
            metascore = row['Meatscore of movie'].replace('*****', '0')
            cursor.execute("""
                INSERT INTO movie_data (movie_name, year_of_release, watch_time, rating, metascore, votes, description)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
            """, (row['Movie Name'], row['Year of Release'], row['Watch Time'], row['Movie Rating'], metascore, votes, row['Description']))

        print("Data loaded into MySQL successfully.")
    except (mysql.connector.Error, NoCredentialsError) as e:
        print(f"Error loading data into MySQL: {e}")




* Run the load_data_to_mysql() Function

In [40]:
# Call the function to load data into MySQL
load_data_to_mysql(connection, s3_bucket_name, aws_access_key, aws_secret_key, aws_region, s3_object_key)

                                           Movie Name  Year of Release  \
0                         1. The Shawshank Redemption             1994   
1                                    2. The Godfather             1972   
2                                  3. The Dark Knight             2008   
3                                 4. Schindler's List             1993   
4    5. The Lord of the Rings: The Return of the King             2003   
..                                                ...              ...   
245                                      246. Stalker             1979   
246                             247. 12 Years a Slave             2013   
247                                  248. Gran Torino             2008   
248          249. Lock, Stock and Two Smoking Barrels             1998   
249                                      250. Warrior             2011   

    Watch Time  Movie Rating Meatscore of movie      Votes  \
0       2h 22m           9.3                 82  

* Function to Upload a Local File(Tranformation sql script) to an AWS S3 Bucket


In [41]:
def upload_to_s3(local_file, bucket, s3_key):
    try:
        # Upload the file
        s3.upload_file(local_file, bucket, s3_key)
        print(f"File '{local_file}' uploaded to '{bucket}' with key '{s3_key}' successfully.")
    except FileNotFoundError:
        print(f"The file '{local_file}' was not found.")
    except NoCredentialsError:
        print("Credentials not available or incorrect.")

* Run the upload_to_s3() Function

In [42]:
# Upload the file to S3
upload_to_s3(local_file_path, s3_bucket_name, s3_object_key_1)

File '/content/drive/MyDrive/Colab Notebooks/Transform.sql' uploaded to 'etlpro' with key 'Transform.sql' successfully.


* Function to Execute SQL Transformation Script from S3 on MySQL Database


In [43]:
def execute_sql_script_from_s3(connection, s3_bucket_name, aws_access_key, aws_secret_key, aws_region, s3_object_key):
    try:
        # Download SQL script from S3
        response = s3.get_object(Bucket=s3_bucket_name, Key=s3_object_key_1)
        sql_script = response['Body'].read().decode('utf-8')

        # Execute the SQL script
        cursor.execute(sql_script)

        print("Transformation Script executed successfully!")

    except (mysql.connector.Error, NoCredentialsError) as e:
        print(f"Error executing script: {e}")


* Run the execute_sql_script_from_s3() Function

In [44]:
execute_sql_script_from_s3(connection, s3_bucket_name, aws_access_key, aws_secret_key, aws_region, s3_object_key_1)

Transformation Script executed successfully!


* Function to Convert MySQL Data to CSV and Upload to S3


In [45]:
def convert_to_csv_upload_to_s3(connection, s3_bucket_name, aws_access_key, aws_secret_key, aws_region, s3_object_key_csv):
    try:
        # Execute the SQL script
        cursor.execute('use imdb;')
        cursor.execute('SELECT * FROM movie_data;')

        # Fetch data from the executed query
        rows = cursor.fetchall()
        columns = [column[0] for column in cursor.description]
        df = pd.DataFrame.from_records(rows, columns=columns)

        # Print the DataFrame
        print(df)

        print("SQL Script executed successfully!")

        # Save DataFrame to CSV
        csv_content = df.to_csv(index=False)

        # Upload CSV to S3
        s3.put_object(Body=csv_content, Bucket=s3_bucket_name, Key=s3_object_key_csv)

        print("CSV File uploaded to S3 successfully!")

    except (mysql.connector.Error, NoCredentialsError) as e:
        print(f"Error executing script: {e}")

* Run the convert_to_csv_upload_to_s3() Function
* if you problem or error to run this function then rerun the database connection and after that it will be resolve

In [49]:
convert_to_csv_upload_to_s3(connection, s3_bucket_name, aws_access_key, aws_secret_key, aws_region, s3_object_key_csv)

                                           movie_name  year_of_release  \
0                         1. The Shawshank Redemption             1994   
1                                    2. The Godfather             1972   
2                                  3. The Dark Knight             2008   
3                                 4. Schindler's List             1993   
4    5. The Lord of the Rings: The Return of the King             2003   
..                                                ...              ...   
202                                      246. Stalker             1979   
203                             247. 12 Years a Slave             2013   
204                                  248. Gran Torino             2008   
205          249. Lock, Stock and Two Smoking Barrels             1998   
206                                      250. Warrior             2011   

    watch_time rating  metascore    votes avg_rating   id  
0       2h 22m    9.3         82  2830101       45.

* Closing MySQL Database Connection

In [50]:
connection.close()