# Data Migration CSV To Mysql


## Configuration


In [1]:
import json
import os
import urllib.parse

import numpy as np
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Environment Variables
load_dotenv(dotenv_path="../../../../.env")
MYSQL_DB = os.getenv("MYSQL_DB")
MYSQL_DB = os.getenv("MYSQL_DB")
MYSQL_DB_USERNAME = os.getenv("MYSQL_DB_USERNAME")
MYSQL_DB_PASSWORD = os.getenv("MYSQL_DB_PASSWORD")
MYSQL_DB_HOST = os.getenv("MYSQL_DB_HOST")
MYSQL_DB_PORT = os.getenv("MYSQL_DB_PORT")

# Spark Session
spark = SparkSession.Builder() \
    .appName("Play With Data") \
    .master("local[1]") \
    .config("spark.jars", "../../../../library/mysql-connector-j-8.1.0.jar") \
    .getOrCreate()

# Dataframe
df_csv_superstore = spark.read.csv(
    path="../../../../data/raw/sample_superstore.csv",
    header=True,
    sep=",",
    escape='"'
)
df_csv_superstore = df_csv_superstore.where(
    df_csv_superstore["Product ID"] != "TEC-AC-10004659"
)
df_csv_superstore.createOrReplaceTempView("df_csv_superstore")
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# DB Connection
URI = f"mysql+pymysql://{MYSQL_DB_USERNAME}:{urllib.parse.quote_plus(
    MYSQL_DB_PASSWORD)}@{MYSQL_DB_HOST}:{MYSQL_DB_PORT}/{MYSQL_DB}"
engine = create_engine(url=URI, pool_pre_ping=True)
session = sessionmaker(bind=engine)
session = session()

24/12/24 17:45:30 WARN Utils: Your hostname, rhayeksa-Infinix-INBook-X1 resolves to a loopback address: 127.0.1.1; using 192.168.0.108 instead (on interface wlo1)
24/12/24 17:45:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/12/24 17:45:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Migration


### Customer


In [2]:
TBL_CUST_ADDR = "tbl_customer_addr"
TBL_SAL_ORD = "tbl_sales_order"
FK_CUST_ADDR_CUST = "fk_cust_addr_cust"
FK_SAL_ODR_CUST = "fk_sal_ord_cust"
try:
    qry = session.execute(text(
        f"""
        SELECT TABLE_NAME, INDEX_NAME
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE index_schema = '{MYSQL_DB}'
        AND TABLE_NAME IN('{TBL_CUST_ADDR}', '{TBL_SAL_ORD}')
        AND INDEX_NAME IN('{FK_CUST_ADDR_CUST}', '{FK_SAL_ODR_CUST}')
        """
    )).mappings().fetchall()
    if len(qry) > 0:
        for i in qry:
            session.execute(text(
                f"""
                ALTER TABLE {MYSQL_DB}.{i["TABLE_NAME"]}
                    DROP FOREIGN KEY {i["INDEX_NAME"]}
                    , DROP INDEX {i["INDEX_NAME"]}
                """
            ))
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_customer"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_customer(
          customer_id VARCHAR(25) NOT NULL
          , name VARCHAR(45) NOT NULL
          , segment VARCHAR(15) NOT NULL
          , PRIMARY KEY (customer_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
          `Customer ID` AS customer_id
          , `Customer Name` AS customer_name
          , segment
        FROM df_csv_superstore
        GROUP BY `Customer ID`, `Customer Name`, segment
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    val = str()
    for i in range(len(qry)):
        customer_id = str(qry[i]['customer_id']).replace("'", "\\'")
        customer_name = str(qry[i]['customer_name']).replace("'", "\\'")
        segment = str(qry[i]['segment']).replace("'", "\\'")
        if i == 0:
            val += f"('{customer_id}', '{customer_name}', '{segment}')"
        else:
            val += f", ('{customer_id}', '{customer_name}', '{segment}')"
    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_customer(customer_id, name, segment)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

[Stage 3:>                                                          (0 + 1) / 1]

Successfully migrated


                                                                                

### Customer Address


In [3]:
try:
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_customer_addr"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_customer_addr(
            customer_addr_id VARCHAR(25) NOT NULL
            , customer_id VARCHAR(25) NOT NULL
            , country VARCHAR(15) NOT NULL
            , city VARCHAR(25) NOT NULL
            , state VARCHAR(25) NOT NULL
            , postal_code INT NOT NULL
            , region VARCHAR(10) NOT NULL
            , PRIMARY KEY (customer_addr_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
            `Customer ID` customer_id
            , country
            , city
            , state
            , `postal code` postal_code
            , region
        FROM df_csv_superstore
        GROUP BY `customer id`, country, city, state, `postal code`, region
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    val = str()
    for i in range(len(qry)):
        cust_addr_id = f"CUST-ADDR-{i+1}"
        customer_id = str(qry[i]['customer_id']).replace("'", "\\'")
        country = str(qry[i]['country']).replace("'", "\\'")
        city = str(qry[i]['city']).replace("'", "\\'")
        state = str(qry[i]['state']).replace("'", "\\'")
        postal_code = str(qry[i]['postal_code']).replace("'", "\\'")
        region = str(qry[i]['region']).replace("'", "\\'")
        if i == 0:
            val += f"""(
                        '{cust_addr_id}', '{customer_id}', '{country}', '{city}', '{state}', '{postal_code}', '{region}'
                    )"""
        else:
            val += f""", (
                        '{cust_addr_id}', '{customer_id}', '{country}', '{city}', '{state}', '{postal_code}', '{region}'
                    )"""
    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_customer_addr(customer_addr_id, customer_id, country, city, state, postal_code, region)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

                                                                                

Successfully migrated


### Product


In [4]:
TBL_PROD_PRC = "tbl_product_price"
TBL_SAL_ORD_DET = "tbl_sales_order_detail"
FK_PROD_PRC_PROD = "fk_prod_prc_prod"
FK_SAL_ORD_DET_PROD = "fk_sal_ord_det_prod"
try:
    qry = session.execute(text(
        f"""
        SELECT TABLE_NAME, INDEX_NAME
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE index_schema = '{MYSQL_DB}'
        AND TABLE_NAME IN('{TBL_PROD_PRC}', '{TBL_SAL_ORD_DET}')
        AND INDEX_NAME IN('{FK_PROD_PRC_PROD}', '{FK_SAL_ORD_DET_PROD}')
        """
    )).mappings().fetchall()
    if len(qry) > 0:
        for i in qry:
            session.execute(text(
                f"""
                ALTER TABLE {MYSQL_DB}.{i["TABLE_NAME"]}
                    DROP FOREIGN KEY {i["INDEX_NAME"]}
                    , DROP INDEX {i["INDEX_NAME"]}
                """
            ))
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_product"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_product(
          product_id VARCHAR(20) NOT NULL
          , name VARCHAR(225) NOT NULL
          , category VARCHAR(15) NOT NULL
          , sub_category VARCHAR(15) NOT NULL
          , PRIMARY KEY (product_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
            `product id` product_id
            , `product name` product_name
            , category
            , `sub-category` sub_category
        FROM df_csv_superstore
        GROUP BY `product id`, `product name`, category, `sub-category`
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    for i in qry:
        qry_prod_id = session.execute(text(
            f"""
            SELECT product_id FROM {MYSQL_DB}.tbl_product WHERE product_id = :product_id
            """
        ), {"product_id": i["product_id"]}).mappings().fetchone()
        qry_prod_id = f"{i["product_id"]}D" if qry_prod_id else i["product_id"]

        session.execute(text(
            f"""
            INSERT INTO {MYSQL_DB}.tbl_product(product_id, name, category, sub_category)
            VALUES(:product_id, :name, :category, :sub_category)
            """
        ), {
            "product_id": qry_prod_id,
            "name": i["product_name"],
            "category": i["category"],
            "sub_category": i["sub_category"],
        })

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

Successfully migrated


### Product Price


In [10]:
try:
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_product_price"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_product_price(
            product_price_id VARCHAR(15) NOT NULL
            , product_id VARCHAR(20) NOT NULL
            , capital_price FLOAT NOT NULL
            , sales_price FLOAT NOT NULL
            , profit FLOAT NOT NULL
            , discount FLOAT NOT NULL
            , PRIMARY KEY (product_price_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
            `product name` product_name
            , discount
            , sales
            , ROUND((sales/(1-discount)) / quantity, 4) sales_price
        FROM df_csv_superstore
        GROUP BY sales, discount, `product id`, `product name`, ROUND((sales/(1-discount)) / quantity, 4)
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())
    ls_prod_id = session.execute(text(
        f"""
        SELECT product_id, name FROM {MYSQL_DB}.tbl_product
        """
    )).mappings().fetchall()

    val = str()
    for i in range(len(qry)):
        prod_prc_id = f"PRC-{i+1}"
        prod_id = list(filter(
            lambda item: item['name'] == qry[i]["product_name"], ls_prod_id
        ))[0]["product_id"]
        discount = float(qry[i]['discount'])
        capital_price = 0
        sales_price = float(qry[i]['sales_price'])
        profit = 0
        if i == 0:
            val += f"""(
                    '{prod_prc_id}', '{prod_id}', '{discount}','{capital_price}', '{sales_price}', '{profit}'
                    )"""
        else:
            val += f""", (
                    '{prod_prc_id}', '{prod_id}', '{discount}','{capital_price}', '{sales_price}', '{profit}'
                    )"""

    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_product_price(product_price_id, product_id, discount, capital_price, sales_price, profit)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

Successfully migrated


### Ship


In [6]:
TBL_SAL_ORD = "tbl_sales_order"
FK_SAL_ODR_SHIP = "fk_sal_ord_ship"
try:
    qry = session.execute(text(
        f"""
        SELECT TABLE_NAME, INDEX_NAME
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE index_schema = '{MYSQL_DB}'
        AND TABLE_NAME IN('{TBL_SAL_ORD}')
        AND INDEX_NAME IN('{FK_SAL_ODR_SHIP}')
        """
    )).mappings().fetchall()
    if len(qry) > 0:
        for i in qry:
            session.execute(text(
                f"""
                ALTER TABLE {MYSQL_DB}.{i["TABLE_NAME"]}
                    DROP FOREIGN KEY {i["INDEX_NAME"]}
                    , DROP INDEX {i["INDEX_NAME"]}
                """
            ))
    qry = session.execute(text(
        f"""
        SELECT table_name
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE index_schema = '{MYSQL_DB}'
        AND table_name IN('tbl_sales_order')
        AND index_name IN('fk_ship_id')
        """
    )).mappings().fetchall()
    if len(qry) > 0:
        for i in qry:
            session.execute(text(
                f"""
                ALTER TABLE {MYSQL_DB}.{i["table_name"]}
                    DROP FOREIGN KEY fk_ship_id
                    , DROP INDEX fk_ship_id
                """
            ))
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_ship"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_ship(
          ship_id VARCHAR(15) NOT NULL
          , mode VARCHAR(45) NOT NULL
          , PRIMARY KEY (ship_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT `ship mode` AS mode FROM df_csv_superstore
        GROUP BY `ship mode` ORDER BY `ship mode`
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    val = str()
    for i in range(len(qry)):
        mode = str(qry[i]['mode']).replace("'", "\'")
        val += f"('SHIP-{i+1}', '{mode}')" if i == 0 else f", ('SHIP-{i+1}', '{mode}')"
    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_ship(ship_id, mode)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

Successfully migrated


### Sales Order


In [7]:
TBL_SAL_ORD_DET = "tbl_sales_order_detail"
FK_SAL_ORD_DET_SAL_ORD = "fk_sal_ord_det_sal_ord"
try:
    qry = session.execute(text(
        f"""
        SELECT TABLE_NAME, INDEX_NAME
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE index_schema = '{MYSQL_DB}'
        AND TABLE_NAME = '{TBL_SAL_ORD_DET}'
        AND INDEX_NAME IN('{FK_SAL_ORD_DET_SAL_ORD}')
        """
    )).mappings().fetchall()
    if len(qry) > 0:
        for i in qry:
            session.execute(text(
                f"""
                ALTER TABLE {MYSQL_DB}.{i["TABLE_NAME"]}
                    DROP FOREIGN KEY {i["INDEX_NAME"]}
                    , DROP INDEX {i["INDEX_NAME"]}
                """
            ))
    session.execute(text(f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_sales_order"))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_sales_order(
          sales_order_id VARCHAR(20) NOT NULL
          , order_date DATE NOT NULL
          , ship_date DATE NOT NULL
          , ship_id VARCHAR(15) NOT NULL
          , customer_id VARCHAR(15) NOT NULL
          , grand_total_sales FLOAT NOT NULL
          , grand_total_profit FLOAT NOT NULL
          , PRIMARY KEY (sales_order_id)
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
            `order id` sales_order_id
            , `ship mode` ship_mode
            , to_date(`order date`, 'MM/dd/yyyy') order_date
            , to_date(`ship date`, 'MM/dd/yyyy') ship_date
            , `customer id` customer_id
            , ROUND(SUM(CAST(sales AS FLOAT)), 4) grand_total_sales
            , ROUND(SUM(CAST(profit AS FLOAT)), 4) grand_total_profit
        FROM df_csv_superstore
        GROUP BY `order id`, `order date`, `ship date`, `customer id`, `ship mode`
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    ls_ship_id = session.execute(text(
        f"SELECT ship_id, mode FROM {MYSQL_DB}.tbl_ship"
    )).mappings().fetchall()

    val = str()
    for i in range(len(qry)):
        sales_order_id = str(qry[i]['sales_order_id']).replace("'", "\'")
        order_date = str(qry[i]['order_date']).replace("'", "\'")
        ship_id = list(filter(
            lambda item: item['mode'] == qry[i]["ship_mode"], ls_ship_id
        ))[0]["ship_id"]
        ship_date = str(qry[i]['ship_date']).replace("'", "\'")
        customer_id = str(qry[i]['customer_id']).replace("'", "\'")
        grand_total_sales = float(qry[i]['grand_total_sales'])
        grand_total_profit = float(qry[i]['grand_total_profit'])
        if i == 0:
            val += f"""(
                '{sales_order_id}', '{order_date}', '{ship_id}', '{ship_date}', '{customer_id}', '{grand_total_sales}', '{grand_total_profit}'
                )"""
        else:
            val += f""", (
                '{sales_order_id}', '{order_date}', '{ship_id}', '{ship_date}', '{customer_id}', '{grand_total_sales}', '{grand_total_profit}'
                )"""
    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_sales_order(sales_order_id, order_date, ship_id, ship_date, customer_id, grand_total_sales, grand_total_profit)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

Successfully migrated


### Sales Order Detail


In [8]:
try:
    session.execute(text(
        f"DROP TABLE IF EXISTS {MYSQL_DB}.tbl_sales_order_detail"
    ))
    session.execute(text(
        f"""
        CREATE TABLE IF NOT EXISTS {MYSQL_DB}.tbl_sales_order_detail(
          sales_order_id VARCHAR(20) NOT NULL
          , product_id VARCHAR(15) NOT NULL
          , quantity INT NOT NULL
          , discount FLOAT NOT NULL
          , total_sales FLOAT NOT NULL
          , total_profit FLOAT NOT NULL
        )
        """
    ))

    qry = spark.sql(
        f"""
        SELECT
            `order id` sales_order_id
            , `product id` product_id
            , quantity
            , discount
            , sales total_sales
            , profit total_profit
        FROM df_csv_superstore
        """
    )
    qry = np.array(qry.toJSON().map(json.loads).collect())

    val = str()
    for i in range(len(qry)):
        sales_order_id = str(qry[i]['sales_order_id']).replace("'", "\'")
        product_id = str(qry[i]['product_id']).replace("'", "\'")
        quantity = int(qry[i]['quantity'])
        discount = float(qry[i]['discount'])
        total_sales = float(qry[i]['total_sales'])
        total_profit = float(qry[i]['total_profit'])
        if i == 0:
            val += f"""(
                '{sales_order_id}', '{product_id}', '{quantity}', '{discount}', '{total_sales}', '{total_profit}'
                )"""
        else:
            val += f""", (
                '{sales_order_id}', '{product_id}', '{quantity}', '{discount}', '{total_sales}', '{total_profit}'
                )"""

    session.execute(text(
        f"""
        INSERT INTO {MYSQL_DB}.tbl_sales_order_detail(sales_order_id, product_id, quantity, discount, total_sales, total_profit)
        VALUES {val}
        """
    ))

    session.commit()
    session.close()
    print("Successfully migrated")
except Exception as e:
    print(f"\nError msg : {e}\n")
    session.rollback()
    session.close()

Successfully migrated
