## Connect to and query MySQL databases and perform ETL tasks

This small project shows how to extract and load data to local MySQL databases using spark. It extracts normalized data from database 'world' and transforms it to a One Big Table model, <br> loading it into another MySQL Database <br> 
<br>
Database world is located on [MySQL page](https://dev.mysql.com/doc/index-other.html) <br>
For guidance on spark and hadoop installation check [this youtube video](https://www.youtube.com/watch?v=FIXanNPvBXM) <br>
A mysql connector need to be downloaded from [here](https://dev.mysql.com/downloads/connector/j/). the .jar file can be copied from the connector folder to spark's jars folders to avoid a common error <br>
MySQL user id and password need to be stored as environment variables

### Import required libraries

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os
import sys
from pyspark.sql.functions import *

### Test connection to MySQL database

In [2]:
conf = SparkConf() \
    .setAppName('ETLPipeline') \
    .setMaster('local') 

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

pwd = os.environ['SQLP']
uid = os.environ['SQLU']

server = 'localhost'
src_db = 'world'
target_db = 'world_obt'
src_driver = 'com.mysql.cj.jdbc.Driver'
target_driver = 'com.mysql.cj.jdbc.Driver'
src_url = f"jdbc:mysql://{server}:3306/{src_db}?user={uid}&password={pwd}&useSSL=false"
target_url = f"jdbc:mysql://{server}:3306/{src_db}?user={uid}&password={pwd}&useSSL=false"

target_table = "city_language"

In [3]:
sql = """
SELECT table_name 
FROM information_schema.tables 
WHERE table_schema = DATABASE() 
AND table_name IN ('city', 'country', 'countrylanguage')

 """

# Let's test our connection
df_tables=spark.read. \
    format("jdbc"). \
    options(driver=src_driver, user=uid, password=pwd, url=src_url, query=sql). \
    load()
df_tables.show()

+---------------+
|     TABLE_NAME|
+---------------+
|           city|
|        country|
|countrylanguage|
+---------------+



In [4]:
# Function to execute sql queries
def execute_sql_query(spark, sql_query, driver, user, password, url,limit=40):
    dfs = spark.read.format("jdbc") \
        .options(driver=driver, user=user, password=password, url=url, query=sql_query) \
        .load()
    return dfs.show(limit)

### Schema Queries

In [5]:
sql = """
SELECT * 
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'world'
"""

execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+-------------+------------+---------------+-----------+------+-------+----------+----------+--------------+-----------+---------------+------------+---------+--------------+-------------------+-------------------+----------+------------------+--------+--------------+-------------+
|TABLE_CATALOG|TABLE_SCHEMA|     TABLE_NAME| TABLE_TYPE|ENGINE|VERSION|ROW_FORMAT|TABLE_ROWS|AVG_ROW_LENGTH|DATA_LENGTH|MAX_DATA_LENGTH|INDEX_LENGTH|DATA_FREE|AUTO_INCREMENT|        CREATE_TIME|        UPDATE_TIME|CHECK_TIME|   TABLE_COLLATION|CHECKSUM|CREATE_OPTIONS|TABLE_COMMENT|
+-------------+------------+---------------+-----------+------+-------+----------+----------+--------------+-----------+---------------+------------+---------+--------------+-------------------+-------------------+----------+------------------+--------+--------------+-------------+
|          def|       world|           city|BASE TABLE |InnoDB|     10|Dynamic   |      4046|           101|     409600|              0|      114688|  

In [6]:
sql = """
SELECT * 
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'world'
"""

execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+-------------+------------+---------------+--------------+----------------+--------------+-----------+---------+------------------------+----------------------+-----------------+-------------+------------------+------------------+------------------+--------------------+----------+--------------+--------------------+--------------+---------------------+------+
|TABLE_CATALOG|TABLE_SCHEMA|     TABLE_NAME|   COLUMN_NAME|ORDINAL_POSITION|COLUMN_DEFAULT|IS_NULLABLE|DATA_TYPE|CHARACTER_MAXIMUM_LENGTH|CHARACTER_OCTET_LENGTH|NUMERIC_PRECISION|NUMERIC_SCALE|DATETIME_PRECISION|CHARACTER_SET_NAME|    COLLATION_NAME|         COLUMN_TYPE|COLUMN_KEY|         EXTRA|          PRIVILEGES|COLUMN_COMMENT|GENERATION_EXPRESSION|SRS_ID|
+-------------+------------+---------------+--------------+----------------+--------------+-----------+---------+------------------------+----------------------+-----------------+-------------+------------------+------------------+------------------+--------------------+---

### Other Queries

In [7]:
sql = """
SELECT COUNT(*) 
FROM world.country
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+--------+
|COUNT(*)|
+--------+
|     239|
+--------+



In [8]:
# List of capitals and the respective country and population
sql = """
WITH capital_city_id AS (
    SELECT capital as capital_id
    FROM country
)

SELECT c.Name, c.CountryCode, co.Name as country, co.Population
FROM city c
JOIN capital_city_id cap
JOIN country co
ON c.ID = co.capital
WHERE c.ID = cap.capital_id
ORDER BY Population
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+--------------------+-----------+--------------------+----------+
|                Name|CountryCode|             country|Population|
+--------------------+-----------+--------------------+----------+
|Adamstown        ...|        PCN|Pitcairn         ...|        50|
|West Island      ...|        CCK|Cocos (Keeling) I...|       600|
|Città del Vatican...|        VAT|Holy See (Vatican...|      1000|
|Stanley          ...|        FLK|Falkland Islands ...|      2000|
|Kingston         ...|        NFK|Norfolk Island   ...|      2000|
|Alofi            ...|        NIU|Niue             ...|      2000|
|Fakaofo          ...|        TKL|Tokelau          ...|      2000|
|Flying Fish Cove ...|        CXR|Christmas Island ...|      2500|
|Longyearbyen     ...|        SJM|Svalbard and Jan ...|      3200|
|Jamestown        ...|        SHN|Saint Helena     ...|      6000|
|Saint-Pierre     ...|        SPM|Saint Pierre and ...|      7000|
|The Valley       ...|        AIA|Anguilla         ...|      8

In [9]:
# official languages - group by
sql = """
SELECT language, count(*) as 'country count'
FROM world.countrylanguage
WHERE IsOfficial = 'T'
GROUP BY language
ORDER BY 2 DESC
LIMIT 10
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+--------------------+-------------+
|            language|country count|
+--------------------+-------------+
|English          ...|           44|
|Arabic           ...|           22|
|Spanish          ...|           20|
|French           ...|           18|
|Portuguese       ...|            6|
|German           ...|            6|
|Dutch            ...|            4|
|Italian          ...|            4|
|Malay            ...|            4|
|Russian          ...|            3|
+--------------------+-------------+



In [10]:
# official languages - window function + group by
sql = """
SELECT 
language,
COUNT(countrycode) as official_language_countries,
Rank() over(order by  COUNT(countrycode) DESC) as 'language rank'
FROM countrylanguage 
where 1=1
AND IsOfficial = 'T'
group by language
LIMIT 20
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+--------------------+---------------------------+-------------+
|            language|official_language_countries|language rank|
+--------------------+---------------------------+-------------+
|English          ...|                         44|            1|
|Arabic           ...|                         22|            2|
|Spanish          ...|                         20|            3|
|French           ...|                         18|            4|
|German           ...|                          6|            5|
|Portuguese       ...|                          6|            5|
|Dutch            ...|                          4|            7|
|Malay            ...|                          4|            7|
|Italian          ...|                          4|            7|
|Serbo-Croatian   ...|                          3|           10|
|Russian          ...|                          3|           10|
|Danish           ...|                          3|           10|
|Samoan           ...|   

### Database Denormalization: Normalized to One Big Table

#### Query to denormalize

In [11]:
# we need a composite key to have a primary key. several languages can be spoken in the same city, 
# so we join the city ID with the languages spoken in that city

sql_transform = """
SELECT
    ROW_NUMBER() OVER (ORDER BY co.Name, c.Name, cl.percentage DESC) AS ID,
    CONCAT(c.ID, '-', cl.language) AS composite_key,
    c.ID as city_id,
    c.Name as city_name,
    cl.language,
    co.Name as country_name,
    c.CountryCode,
    cl.isofficial as language_isofficial,
    cl.percentage as language_percentage,
    c.District as city_district,
    c.population as city_population,
    co.continent,
    co.region,
    co.surfacearea,
    co.IndepYear,
    co.population as country_population,
    co.LifeExpectancy,
    co.GNPOld,
    co.localname as country_local_name,
    co.governmentform,
    co.headofstate,
    co.capital as country_capital,
    co.code2 as countrycode_2
FROM countrylanguage cl
JOIN country co
ON co.code = cl.countrycode
JOIN city c
ON c.countrycode = cl.countrycode
ORDER BY 1
"""
execute_sql_query(spark, sql_transform, src_driver, uid, pwd, src_url)

+---+-------------+-------+--------------------+--------------------+--------------------+-----------+-------------------+-------------------+--------------------+---------------+-------------+--------------------+-----------+---------+------------------+--------------+--------+--------------------+--------------------+--------------------+---------------+-------------+
| ID|composite_key|city_id|           city_name|            language|        country_name|CountryCode|language_isofficial|language_percentage|       city_district|city_population|    continent|              region|surfacearea|IndepYear|country_population|LifeExpectancy|  GNPOld|  country_local_name|      governmentform|         headofstate|country_capital|countrycode_2|
+---+-------------+-------+--------------------+--------------------+--------------------+-----------+-------------------+-------------------+--------------------+---------------+-------------+--------------------+-----------+---------+------------------

#### Validations

##### Check if number of rows is correct (if both count number match)

In [12]:
sql = """
with city_language_by_country as (
    SELECT c.countrycode, count(distinct c.ID) * count(distinct cl.language) as combinations_city_language
    FROM city c
    join countrylanguage cl
    ON cl.countrycode = c.countrycode
    group by countrycode
)

select sum(combinations_city_language)
from city_language_by_country
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+-------------------------------+
|sum(combinations_city_language)|
+-------------------------------+
|                          30670|
+-------------------------------+



In [13]:
sql = """
with validation as (
    SELECT
        ROW_NUMBER() OVER (ORDER BY co.Name, c.Name, cl.language) AS ID,
        CONCAT(c.ID, '-', cl.language) AS composite_key,
        c.ID as city_id,
        c.Name as city_name,
        cl.language,
        co.Name as country_name,
        c.CountryCode,
        cl.isofficial as language_isofficial,
        cl.percentage as language_percentage,
        c.District as city_district,
        c.population as city_population,
        co.continent,
        co.region,
        co.surfacearea,
        co.IndepYear,
        co.population as country_population,
        co.LifeExpectancy,
        co.GNPOld,
        co.localname as country_local_name,
        co.governmentform,
        co.headofstate,
        co.capital as country_capital,
        co.code2 as countrycode_2
    FROM countrylanguage cl
    JOIN country co
    ON co.code = cl.countrycode
    JOIN city c
    ON c.countrycode = cl.countrycode
    # ORDER BY city_ID, language_percentage DESC
    ORDER BY 1
)
select count(*)
FROM validation
"""
execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+--------+
|count(*)|
+--------+
|   30670|
+--------+



##### Check Dulpicates

Get a spark dataframe out of the query to check for duplicates

In [14]:
df_lang = spark.read.format("jdbc").\
  option("url", src_url).\
  option("driver", "com.mysql.jdbc.Driver").\
  option("useUnicode", "true").\
  option("continueBatchOnError","true").\
  option("useSSL", "false").\
  option("user", uid).\
  option("password", pwd).\
  option("query",sql_transform).\
  load()

In [15]:
df_lang.show(n = 10, truncate=False)

+---+-------------+-------+-----------------------------------+------------------------------+----------------------------------------------------+-----------+-------------------+-------------------+--------------------+---------------+-------------+--------------------------+-----------+---------+------------------+--------------+------+---------------------------------------------+---------------------------------------------+------------------------------------------------------------+---------------+-------------+
|ID |composite_key|city_id|city_name                          |language                      |country_name                                        |CountryCode|language_isofficial|language_percentage|city_district       |city_population|continent    |region                    |surfacearea|IndepYear|country_population|LifeExpectancy|GNPOld|country_local_name                           |governmentform                               |headofstate                                 

In [16]:
column_to_check = "composite_key"

# Identify duplicate keys
duplicate_keys = (
    df_lang.groupBy(column_to_check)  
    .agg(count("*").alias("count"))
    .filter("count > 1")  # Keep only values appearing more than once
    .select(column_to_check)  # Keep only the column itself
)

In [17]:
duplicate_keys.count()

0

##### Check missing countries

In [18]:
sql_country1 = """
select code
from country
ORDER BY code
"""
df_country1 = spark.read.format("jdbc").\
  option("url", src_url).\
  option("driver", "com.mysql.jdbc.Driver").\
  option("useUnicode", "true").\
  option("continueBatchOnError","true").\
  option("useSSL", "false").\
  option("user", uid).\
  option("password", pwd).\
  option("query",sql_country1).\
  load()

In [19]:
sql_country2 = """
with validation as (
    SELECT
        ROW_NUMBER() OVER (ORDER BY co.Name, c.Name, cl.language) AS ID,
        CONCAT(c.ID, '-', cl.language) AS composite_key,
        c.ID as city_id,
        c.Name as city_name,
        cl.language,
        co.Name as country_name,
        c.CountryCode,
        cl.isofficial as language_isofficial,
        cl.percentage as language_percentage,
        c.District as city_district,
        c.population as city_population,
        co.continent,
        co.region,
        co.surfacearea,
        co.IndepYear,
        co.population as country_population,
        co.LifeExpectancy,
        co.GNPOld,
        co.localname as country_local_name,
        co.governmentform,
        co.headofstate,
        co.capital as country_capital,
        co.code2 as countrycode_2
    FROM countrylanguage cl
    JOIN country co
    ON co.code = cl.countrycode
    JOIN city c
    ON c.countrycode = cl.countrycode
    # ORDER BY city_ID, language_percentage DESC
    ORDER BY 1
)


select distinct countrycode as code
from validation
ORDER BY countrycode
"""
df_country2 = spark.read.format("jdbc").\
  option("url", src_url).\
  option("driver", "com.mysql.jdbc.Driver").\
  option("useUnicode", "true").\
  option("continueBatchOnError","true").\
  option("useSSL", "false").\
  option("user", uid).\
  option("password", pwd).\
  option("query",sql_country2).\
  load()

In [20]:
df_country1.count(), df_country2.count()

(239, 232)

In [21]:
df_country_val = df_country1.union(df_country2)

In [22]:
df_country_val.groupBy('code').count().filter(col("count") < 2).show()

+----+-----+
|code|count|
+----+-----+
| IOT|    1|
| UMI|    1|
| ATA|    1|
| BVT|    1|
| SGS|    1|
| HMD|    1|
| ATF|    1|
+----+-----+



we can see the missing territories don't have population, so we will leave them out

In [23]:
sql = """
select *
from country
where code in ('IOT','UMI','ATA','BVT','SGS','HMD','ATF')
"""

execute_sql_query(spark, sql, src_driver, uid, pwd, src_url)

+----+--------------------+-------------+--------------------+-----------+---------+----------+--------------+----+------+--------------------+--------------------+--------------------+-------+-----+
|Code|                Name|    Continent|              Region|SurfaceArea|IndepYear|Population|LifeExpectancy| GNP|GNPOld|           LocalName|      GovernmentForm|         HeadOfState|Capital|Code2|
+----+--------------------+-------------+--------------------+-----------+---------+----------+--------------+----+------+--------------------+--------------------+--------------------+-------+-----+
| ATA|Antarctica       ...|Antarctica   |Antarctica       ...|13120000.00|     NULL|         0|          NULL|0.00|  NULL|–                ...|Co-administrated ...|                 ...|   NULL|   AQ|
| ATF|French Southern t...|Antarctica   |Antarctica       ...|    7780.00|     NULL|         0|          NULL|0.00|  NULL|Terres australes ...|Nonmetropolitan T...|Jacques Chirac   ...|   NULL|   TF|


#### Load result back to Database - ETL Function

In [24]:
def ETL(spark, src_driver, target_driver, src_url, target_url, uid, pwd, sql_transform, target_db, target_table):
    try:
        # Perform the SQL transformation on the source database tables and convert result to spark dataframe
        df_transformed = spark.read \
            .format("jdbc") \
            .options(driver=src_driver, user=uid, password=pwd, url=src_url, query=sql_transform) \
            .load()
        print("Extraction and Transformation successful")

    except Exception as e:
        print(f"Error during extraction/transformation: {e}")
        return        

    try:
        # Load the transformed data into the MySQL database
        df_transformed.write \
            .format("jdbc") \
            .options(
                driver=target_driver,
                user=uid,
                password=pwd,
                url=target_url,
                dbtable=f"{target_db}.{target_table}"
            ) \
            .mode("overwrite") \
            .save()
        print(f"Data successfully loaded into {target_db}.{target_table}")

    except Exception as e:
        print(f"Error during loading data: {e}")
        return             

In [25]:
ETL(spark, src_driver, target_driver, src_url, target_url, uid, pwd, sql_transform, target_db, target_table)

Extraction and Transformation successful
Data successfully loaded into world_obt.city_language
