# Create MySQL Database for Data scraped with Selenium

In [None]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [None]:
import os
from glob import glob

import mysql.connector
import pandas as pd
import requests
from prefect import Flow, task, unmapped
from prefect.tasks.mysql import MySQLFetch, MySQLExecute
from sqlalchemy import create_engine

<a href="table-of-contents"></a>

## [Table of Contents](#table-of-contents)
0. [About](#about)
1. [User Inputs](#user-inputs)
2. [MySQL server and database pre-requisites](#mysql-server-and-database-pre-requisites)
   - 2.1. [Download SSL Certificates](#download-ssl-certificates)
   - 2.2. [Specify Database Connection string](#specify-database-connection-string)
3. [Perform Administrative Actions on the Server](#perform-administrative-actions-on-the-server)
4. [Create and Populate Tables](#create-and-populate-tables)
   - 4.1. [Load Data with Python to decide on MySQL datatypes](#load-data-with-python-to-decide-on-mysql-datatypes)
   - 4.2. [Specify MySQL commands to create table and alter datatypes](#specify-mysql-commands-to-create-table-and-alter-datatypes)
   - 4.3. [Create table and append `DataFrame` to table](#create-table-and-append-`dataframe`-to-table)
   - 4.4. [Change column datatypes](#change-column-datatypes)
5. [Drop Tables](#drop-tables)

<a id="about"></a>

## 0. [About](#about)

In this notebook, we'll download the scraped search results and listings, scraped using Selenium, from cloud storage and clean both datasets. We'll then merge the cleaned datasets and append this clean version to a MySQL database table.

**Workflow**

The intended workflow that these actions will comprise is the following
- retrieve raw scraped data (search results and listings) from cloud storage
- process (clean) the raw data
- append the clean data to a MySQL database

**Assumptions**

The above workflow has the following assumptions
- data was retrieved by web-scraping with Selenium (eg. using `2_*.ipynb`)
  - scraped data is stored in files with names `p*_l*__*.ipynb` (listings) and `search_results_*.ipynb` (search results) in `data/raw/selenium`
- raw scraped data is uploaded (from `data/raw/selenium`) to cloud storage (eg. using `8_*.ipynb`)

In order to [orchestrate](https://www.qubole.com/blog/apache-airflow-tutorial-etl-elt-workflow-orchestration-made-easy/) this process of downloading (Extract), cleaning (Transform) and loading data into a database (Load), a [workflow management](https://fivetran.com/blog/data-orchestration-explained-no-diy) tool (Prefect) will be used. This is optional and was not a requirement, but has been used here as was done in the notebooks to upload (`8_*.ipynb`) and download (`9_*.ipynb`) data.

This notebook will use Prefect to perform each of the E-T-L steps of the workflow described above.

**Notes**
1. The MySQL database used here will be configured to [use SSL connections between the server and client](https://www.datasunrise.com/blog/professional-info/mitm-sql_server-protection/). If so, the SSL connection configuration might [use](https://community.bitnami.com/t/how-can-i-connect-to-a-mysql-database-that-requires-ssl/56589) ([example manual setup](https://www.howtoforge.com/tutorial/how-to-enable-ssl-and-remote-connections-for-mysql-on-centos-7/)) a [SSL certificate from DigiCert](https://www.digicert.com/kb/digicert-root-certificates.htm).

**Requirements**
1. As mentioned in the [`README.md`](https://github.com/elsdes3/steam-games-web-scraping-eda/blob/master/README.md), the following environment variables must be defined in the calling shell before running this notebook
   ```bash
   export MYSQL_ADMIN_USER_NAME=<credential-here>
   export MYSQL_ADMIN_USER_PWD=<credential-here>
   export MYSQL_NON_ADMIN_USER_NAME=<credential-here>
   export MYSQL_NON_ADMIN_USER_PWD=<credential-here>
   export MYSQL_HOST=localhost
   export MYSQL_PORT=3306
   ```

<a id="user-inputs"></a>

## 1. [User Inputs](#user-inputs)

Define variables that can be changed when running this notebook

In [None]:
PROJ_ROOT_DIR = os.getcwd()
admin_user = os.getenv("MYSQL_ADMIN_USER_NAME")
non_admin_user = os.getenv("MYSQL_NON_ADMIN_USER_NAME")
host = os.getenv("MYSQL_HOST")
port = int(os.getenv("MYSQL_PORT"))

In [None]:
database_name = "mydbdemo"

ssl_cert_url = "https://www.digicert.com/CACerts/BaltimoreCyberTrustRoot.crt.pem"

# for creating database, set "no" to create database; otherwise, set "yes"
is_database_exists = False

# for creating user, set "no" to create a user; otherwise, set "yes"
is_user_exists = False

# name of database table to hold scraped data
table_name = "listings"

# whether to delete database table
drop_table = True

In [None]:
# Path to data/raw
data_dir = os.path.join(PROJ_ROOT_DIR, "data")
raw_data_dir = os.path.join(data_dir, "raw")

# Path to data/raw/selenium
selenium_files_dir = os.path.join(raw_data_dir, "selenium")

# List of listings CSV files created using selenium
fpaths_selenium = glob(os.path.join(selenium_files_dir, "p*_*.csv"))

# List of search results files created by Selenium
selenium_search_results_pages = glob(
    os.path.join(selenium_files_dir, "search_results_page_*_*.parquet.gzip")
)

# Path to locally downloaded SSL certificate
ssl_cert_path = os.path.join(raw_data_dir, "BaltimoreCyberTrustRoot.crt.pem")

# Non-Admin user name without @<host-name>
non_admin_user_name = (
    non_admin_user.split("@")[0] if host != "localhost" else non_admin_user
)

# Admin and non-admin user passwords
admin_user_pwd = os.getenv("MYSQL_ADMIN_USER_PWD")
non_admin_user_pwd = os.getenv("MYSQL_NON_ADMIN_USER_PWD")

# Connection dictionary for MySQL-Python client
conn_dict = dict(
    db_name=database_name,
    user=non_admin_user,
    password=non_admin_user_pwd,
    host=host,
)

In [None]:
def summarize_df(df, col_dtype_to_show):
    if col_dtype_to_show == "object":
        # Get string dtype columns
        cols_to_show = list(df.select_dtypes("object"))
        # Get max length of string
        df_max = (
            df[cols_to_show]
            .astype(str)
            .apply(lambda x: x.str.len().max(), axis=0)
            .rename("max_length")
            .to_frame()
        )
    else:
        # Get non-string (numerical) dtype columns
        cols_to_show = list(set(list(df)) - set(list(df.select_dtypes("object"))))
        # Get max numerical value
        df_max = df[cols_to_show].max().rename("max_value").to_frame()
    display(
        df_max.merge(
            df[cols_to_show].dtypes.rename("dtype").to_frame(),
            left_index=True,
            right_index=True,
            how="left",
        )
        .merge(
            df[cols_to_show].isna().sum().rename("num_missing").to_frame(),
            left_index=True,
            right_index=True,
            how="left",
        )
        .merge(
            df[cols_to_show]
            .dropna(how="any")
            .sample(1)
            .squeeze()
            .rename("single_non_nan_value")
            .to_frame(),
            left_index=True,
            right_index=True,
            how="left",
        )
    )


def show_sql_df(query, cursor, cnx=None, table_output=False):
    cursor.execute(query)
    if cnx:
        cnx.commit()
    if table_output:
        colnames = [cdesc[0] for cdesc in cursor.description]
        cur_fetched = cursor.fetchall()
        if cur_fetched:
            df_oper_out = pd.DataFrame.from_records(cur_fetched, columns=colnames)
            display(df_oper_out)
        else:
            df_oper_out = pd.DataFrame()
        return df_oper_out
    else:
        return pd.DataFrame()


def sqlalchemy_show_query_df(
    query, db_connection, use_exception_handling=False, show_output=False
):
    df = pd.DataFrame()
    if use_exception_handling:
        try:
            df = pd.read_sql(query, con=db_connection)
        except Exception as e:
            print(str(e))
    else:
        df = pd.read_sql(query, con=db_connection)
    if not df.empty and show_output:
        display(df)
    return df

In [None]:
def load_concatenate_listings_search_results(
    listing_filepaths, selenium_search_results_pages
):
    """Clean and merge listings and search results datasets."""
    # Process Listings files
    df_listings = (
        pd.concat(
            [pd.read_csv(f) for f in listing_filepaths],
            ignore_index=True,
        )
        .dropna(subset=["Title"])
        .sort_values(by=["page_num", "listing_num"])
        .reset_index(drop=True)
    )

    # Process Search Results files
    df_search_results = (
        pd.concat(
            [
                pd.read_parquet(
                    selenium_search_results_page,
                    engine="auto",
                )
                for selenium_search_results_page in selenium_search_results_pages
            ],
            ignore_index=True,
        )
        .astype({"page": int})
        .dropna(subset=["title"])
        .sort_values(by=["page", "listing_counter"])
        .reset_index(drop=True)
    )

    # Drop duplicated search results
    # - these appeared on a multiple page numbers when the Selenium
    #   webdriver queried the Steam store search
    df_search_results = df_search_results.drop_duplicates(
        subset=["title", "url"]
    ).reset_index(drop=True)

    # Merge listings with search results
    dfm = df_search_results.merge(
        df_listings,
        left_on=["title"],
        right_on=["Title"],
        how="left",
    )

    # Replace blank strings in date tolumns by NaN
    dfm["Early Access Release Date"] = dfm["Early Access Release Date"].replace(
        "", None
    )
    dfm["Release Date"] = dfm["Release Date"].replace("", None)
    dfm["release_date"] = dfm["release_date"].replace("", None)

    # Add suffix to the column name for attributes that were scraped from both the
    # search results and the listings (add suffix to column scraped from the listings)
    multi_scraped_cols = ["Title", "Release Date", "platforms"]
    dfm = dfm.rename(columns={c: c + "_listings" for c in multi_scraped_cols})

    # Clean column names
    dfm.columns = dfm.columns.str.replace(" ", "_")

    # Clean string columns
    dfm["pct_overall_threshold"] = dfm["pct_overall_threshold"].str.split(
        "<br>", expand=True
    )[0]
    dfm["pct_overall_threshold_lang"] = dfm["pct_overall_threshold_lang"].str.split(
        "<br>", expand=True
    )[0]
    return dfm

In [None]:
@task
def run_sql_query(conn_dict, query, ssl_cert_path):
    sql_execute = MySQLExecute(**conn_dict)
    query_nrows_affected = sql_execute.run(
        query=query,
        ssl=dict(MYSQL_OPT_SSL_CAPATH=ssl_cert_path),
    )
    return query_nrows_affected


@task
def extract_transform(listing_filepaths, selenium_search_results_pages):
    return load_concatenate_listings_search_results(
        listing_filepaths, selenium_search_results_pages
    )


@task
def load_my_data(df, table_name, db_connection):
    df.to_sql(table_name, con=db_connection, index=False, if_exists="append")


@task
def query_database(conn_dict, query, table_name, ssl_cert_path):
    sql_fetch = MySQLFetch(**conn_dict)
    query_result = sql_fetch.run(
        query=query,
        ssl=dict(MYSQL_OPT_SSL_CAPATH=ssl_cert_path),
        fetch="all",
    )
    colnames_tuples = sql_fetch.run(
        query=f"SHOW COLUMNS FROM {table_name}",
        ssl=dict(MYSQL_OPT_SSL_CAPATH=ssl_cert_path),
        fetch="all",
    )
    # print(colnames_tuples)
    colnames = [colnames_tuple[0] for colnames_tuple in colnames_tuples]
    df_query = pd.DataFrame.from_records(query_result, columns=colnames)
    return df_query

<a id="mysql-server-and-database-pre-requisites"></a>

## 2. [MySQL server and database pre-requisites](#mysql-server-and-database-pre-requisites)

<a id="download-ssl-certificates"></a>

### 2.1. [Download SSL Certificates](#download-ssl-certificates)

Since [SSL was enforced on the MySQL server VM](https://docs.microsoft.com/en-us/azure/mysql/howto-configure-ssl#using-azure-cli), follow [instructions on Azure documentation](https://docs.microsoft.com/en-us/azure/mysql/howto-configure-ssl#step-1-obtain-ssl-certificate) to download the SSL certificate required for connecting with
- `sqlalchemy`
  - via `create_engine`
  - docs ([1](https://docs.sqlalchemy.org/en/14/core/engines.html), [2](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#ssl-connections))
    - when querying the database
- `mysql-connector-python`
  - via the `ssl_ca` keyword in `.connect()`
  - docs ([1](https://docs.microsoft.com/en-us/azure/mysql/howto-configure-ssl#python-mysqlconnector-python), [2](https://dev.mysql.com/doc/connector-python/en/connector-python-example-connecting.html), [3](https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html))

In [None]:
if not os.path.exists(ssl_cert_path):
    response = requests.get(ssl_cert_url)
    with open(ssl_cert_path, "wb") as file:
        file.write(response.content)

In [None]:
print(ssl_cert_path)
ssl_args = {"ssl_ca": ssl_cert_path}

<a id="specify-database-connection-string"></a>

### 2.2. [Specify Database Connection string](#specify-database-connection-string)

In [None]:
db_connection_str = (
    f"mysql+mysqlconnector://{non_admin_user}:{non_admin_user_pwd}@"
    f"{host}:{port}/{database_name}"
)
db_connection = create_engine(db_connection_str, connect_args=ssl_args)
print(db_connection)

This will be used by SQLAlchemy.

<a id="perform-administrative-actions-on-the-server"></a>

## 3. [Perform Administrative Actions on the Server](#perform-administrative-actions-on-the-server)

Perform one-time administrative actions on the server

In [None]:
mydb = mysql.connector.connect(
    host=host,
    user=admin_user,
    password=admin_user_pwd,
    ssl_ca=ssl_cert_path,
    port=port,
)
cur = mydb.cursor()

Create the database

In [None]:
%%time
if not is_database_exists:
    query = f"CREATE DATABASE IF NOT EXISTS {database_name}"
    show_sql_df(query, cur)

Create a user and grant the user all privileges

In [None]:
%%time
if not is_user_exists:
    for query in [
        f"CREATE USER '{non_admin_user_name}'@'%' IDENTIFIED BY '{non_admin_user_pwd}'",
        f"GRANT ALL PRIVILEGES ON {database_name} . * TO '{non_admin_user_name}'@'%'",
        "FLUSH PRIVILEGES",
    ]:
        show_sql_df(query, cur)

Show whether the MySQL server is configured to use a SSL connection between the server and client application ([1](https://stackoverflow.com/a/37319152/4057186), [2](https://dba.stackexchange.com/a/40176/177332))

In [None]:
%%time
query = "SHOW STATUS LIKE 'Ssl_cipher'"
_ = show_sql_df(query, cur, table_output=True)

**Notes**
1. The `Value` column of the output will be
   - empty, if SSL is not configured on the MySQL server
   - populated, if the MySQL server enforces a SSL connection to the client

Show available databases after creating the `airbnb` database

In [None]:
%%time
query = "SHOW DATABASES"
_ = show_sql_df(query, cur, table_output=True)

Show the updated user names and selected privileges

In [None]:
%%time
query = "SELECT User, Update_priv, Insert_priv, Alter_priv, Grant_priv, Drop_priv FROM mysql.user"
_ = show_sql_df(query, cur, table_output=True)

In [None]:
mydb.close()
cur.close()

<a id="create-and-populate-tables"></a>

## 4. [Create and Populate Tables](#create-and-populate-tables)

<a id="load-data-with-python-to-decide-on-mysql-datatypes"></a>

### 4.1. [Load Data with Python to decide on MySQL datatypes](#load-data-with-python-to-decide-on-mysql-datatypes)

Load the data with `pandas`

In [None]:
%%time
df = load_concatenate_listings_search_results(
    fpaths_selenium, selenium_search_results_pages
)
display(df.head(1).append(df.tail(1)))
display(df[["platform_names", "platforms_listings", "languages", "num_languages"]].sample(5))
display(df.dtypes.rename("dtype").to_frame())

We'll now show the datatype, maximum length (number of characters) and a random non-missing value from the columns that are of string datatype (upper) and non-strings (floats or integers, lower)

In [None]:
summarize_df(df, "object")
summarize_df(df, "numerical")

**Observations**
1. There are a number of numerical columns that `pandas` has as `float`s but that should be integers. These are the identification columns, such as `page`, `page_num`, `listing_num`, etc. Of these, `page` and `listing_num` are not missing values and they shouldn't be missing any. `page_num` is only missing since some listings that appeared in the search results were not scraped, as has been discussed in earlier notebooks. There are also the number of positive (`review_type_positive`) and negative (`review_type_negative`) reviews that need to be integers. The same is true for `review_language_mine` (the number of listing reviews in the selected language), `num_languages` (the number of languages supported by the listing) and `num_steam_achievements`(the number of Steam achievements, which will be an integer with a value of 0 or higher). All these columns should be integers but, due to the presence of missing values in `pandas`, are floats in the `DataFrame`. The `int` dtype in `MySQL` supports the `NULL` values. So, we need to, for example, convert all these columns into `int`s after loading the data.

   Since the raw data is being loaded into a `DataFrame`, before inserting it in to the MySQL table, the `DataFrame` datatype of these `int` columns will be `float` since they have missing values. When this `DataFrame` is then added data to the database table, the data in that table's corresponding columns will also be `float`s (since a `DataFrame` can't have an `int` with missing values in a column). So, we'll have to populate the table and then use a MySQL `ALTER TABLE` command to change the columns' datatype to `int`.

   Since the database will only be created and populated after all data has been scraped, a single `ALTER TABLE` command (on the populated table) can be run. This approach will be used here. Alternatively, if we wanted to iteratively append new data to the table (immediately after scraping), we could leave these columns as a MySQL `float`, append new rows (with these columns again appearing as `float`s) and only run the `ALTER TABLE` command to convert them to a MySQL `int` after all the scraping is done and no further data is to be added to the table.
2. For the string columns, we can use the above displayed output (the upper of the two) to pick the MySQL datatype for those columns of the table.

<a id="specify-mysql-commands-to-create-table-and-alter-datatypes"></a>

### 4.2. [Specify MySQL commands to create table and alter datatypes](#specify-mysql-commands-to-create-table-and-alter-datatypes)

Based on the previous sub-section, we'll create a dictionary of non-string columns with their new datatypes (integer) and, use these to create a list of `ALTER TABLE` commands to modify the column datatypes after inserting data into the table

In [None]:
col_dtypes_to_change_dict = {
    "num_languages": "int",
    "page_num": "int",
    "listing_num": "int",
    "num_steam_achievements": "int",
    "review_type_positive": "int",
    "review_type_negative": "int",
    "review_language_mine": "int",
}
cn_queries = [
    f"""ALTER TABLE {table_name} MODIFY {col_name} {new_dtype}"""
    for col_name, new_dtype in col_dtypes_to_change_dict.items()
]

We'll now define the MySQL `CREATE TABLE` command

In [None]:
create_table_query = f"""
                     CREATE TABLE IF NOT EXISTS {table_name} (
                         listing_id INT NOT NULL AUTO_INCREMENT,
                         page int,
                         listing_counter int,
                         title text COLLATE utf8mb4_unicode_ci,
                         url blob,
                         platform_names varchar(50),
                         release_date varchar(20),
                         discount_pct varchar(10),
                         original_price varchar(50),
                         discount_price varchar(50),
                         review_type_all float,
                         overall_review_rating varchar(30),
                         pct_overall float,
                         pct_overall_threshold varchar(20),
                         pct_overall_lang float,
                         pct_overall_threshold_lang varchar(20),
                         platforms_listings varchar(25),
                         user_defined_tags text,
                         num_steam_achievements float,
                         drm text COLLATE utf8mb4_unicode_ci,
                         rating varchar(10),
                         rating_descriptors varchar(150),
                         review_type_positive float,
                         review_type_negative float,
                         review_language_mine float,
                         Title_listings text COLLATE utf8mb4_unicode_ci,
                         Genre varchar(150),
                         Release_Date_listings varchar(20),
                         Early_Access_Release_Date varchar(20),
                         Developer varchar(150) COLLATE utf8mb4_unicode_ci,
                         Publisher varchar(150) COLLATE utf8mb4_unicode_ci,
                         Franchise varchar(150) COLLATE utf8mb4_unicode_ci,
                         languages text,
                         num_languages float,
                         page_num float,
                         listing_num float,
                         PRIMARY KEY (listing_id)
                     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                     """

<a id="create-table-and-append-`dataframe`-to-table"></a>

### 4.3. [Create table and append `DataFrame` to table](#create-table-and-append-`dataframe`-to-table)

We'll now create and populate the single `listings` table that will hold the processed version of the scraped data and then show the first four rows of the table

In [None]:
fetch_query = f"""SELECT * FROM {table_name} LIMIT 4"""

In [None]:
%%time
with Flow("Create and Populate Listings Table") as flow:
    # Create table in database
    create_table_output = run_sql_query(conn_dict, create_table_query, ssl_cert_path)

    # Extract raw listings and search results and Transform
    proc_data_output = extract_transform(
        fpaths_selenium,
        selenium_search_results_pages,
        # upstream_tasks=[create_table_output],
    )

    # Load transformed data into database table
    loaded_output = load_my_data(
        proc_data_output, table_name, db_connection,
        upstream_tasks=[create_table_output, proc_data_output],
    )

    # Query database
    query_output = query_database(
        conn_dict,
        fetch_query,
        table_name,
        ssl_cert_path,
        upstream_tasks=[loaded_output]
    )


state_query = flow.run()
query_output = state_query.result[query_output].result
display(query_output)

<a id="change-column-datatypes"></a>

### 4.4. [Change column datatypes](#change-column-datatypes)

We'll now change the column datatypes to `INT` for a subset of the non-string columns that we identified earlier

In [None]:
%%time
with Flow("Update Listings Table Datatypes") as flow:
    # Change column datatype in table
    alter_table_output = run_sql_query.map(
        query=cn_queries,
        conn_dict=unmapped(conn_dict),
        ssl_cert_path=unmapped(ssl_cert_path),
    )

    # Query database
    query_output = query_database(
        conn_dict,
        fetch_query,
        table_name,
        ssl_cert_path,
        upstream_tasks=[alter_table_output]
    )


state_query = flow.run()
query_output = state_query.result[query_output].result
display(query_output)

Finally, we'll show the datatypes of the table columns that were changed from `float`s to `int`s

In [None]:
%%time
col_dtype_query = f"""
                  SELECT column_name,
                         data_type
                  FROM information_schema.columns
                  WHERE table_schema = '{database_name}'
                  AND table_name = '{table_name}'
                  """
col_dtypes_changed = list(col_dtypes_to_change_dict)
df_col_dtypes = sqlalchemy_show_query_df(col_dtype_query, db_connection, True, False).query(
    "COLUMN_NAME in @col_dtypes_changed"
)
display(df_col_dtypes)

<a id="drop-tables"></a>

## 5. [Drop Tables](#drop-tables)

Drop table from the database

In [None]:
drop_table_query = f"""DROP TABLE IF EXISTS {table_name}"""

In [None]:
%%time
with Flow("Delete Listings Table") as flow:
    # Delete table
    drop_table_output = run_sql_query(conn_dict, drop_table_query, ssl_cert_path)

if drop_table:
    state_query = flow.run()

---

<span style="float:left">
    <a href="./9_download_cloud.ipynb"><< 9 - Download all scraped data (with requests and selenium) from cloud storage</a>
</span>

<span style="float:right">
    2021 | <a href="https://github.com/elsdes3/web-scraping">@elsdes3</a> (MIT)
</span>