# Exercise

In this exercise we will upload the audiostore database from MySQL to Impala. We will do it in the following steps:

1. Locate/create the database in MySQL
2. Create/clean the staging folder in HDFS
3. Make the list of tables in the database
4. Upload the tables to the staging folder as Parquet files
5. Load the data into Impala
6. Query your data

## Part 1

> Make sure you have the audiostore database in MySQL

use `import os` and `os.system` to run cli commands to create a database and run the audiostoreDB.sql script

In [1]:
import os

# os.system('echo blah')
!echo blah

# os.system("mysql -unaya -pNayaPass1!  -e 'show databases;' ")
!mysql -unaya -pNayaPass1!  -e 'show databases;' 

# # drop database if exists
# os.system("sudo mysql -unaya -pNayaPass1!  -e 'DROP DATABASE IF EXISTS audiostore;' ")
!sudo mysql -unaya -pNayaPass1!  -e 'DROP DATABASE IF EXISTS audiostore;'

# # create database audiostore
# os.system("sudo mysql -unaya -pNayaPass1!  -e 'create database audiostore;' ")
!sudo mysql -unaya -pNayaPass1!  -e 'create database audiostore;'

# run databases.sql
# os.system("sudo mysql -unaya -pNayaPass1!  audiostore<AudioStoreDB.sql ")
!sudo mysql -unaya -pNayaPass1!  audiostore<AudioStoreDB.sql



blah
+--------------------+
| Database           |
+--------------------+
| information_schema |
| amon               |
| audiostore         |
| classicmodels      |
| hue                |
| metastore          |
| mysql              |
| nav                |
| navms              |
| oozie              |
| performance_schema |
| rman               |
| scm                |
| sentry             |
| sys                |
+--------------------+


# part 2 
We are going to use the temporary folder /tmp/staging/ for storing the tables before we upload them into
HDFS. Make sure this folder exists (if not - create it) and that it is empty.


- Use pyarrow to create a HadoopFileSystem instance (usually called fs) to control HDFS functionalities.
- Use simple Python scripting with the exists(), mkdir() and delete() methods.

dont forget `import pyarrow as pa` or altenatively, use the file `hadoop_config.py` where we have all the definitions by using `import hadoop_config` 

In [2]:
# import pyarrow as pa
from hadoop_config import fs, mysql_cnx, hdfs, client as impala_client, hive_cnx

# fs = pa.hdfs.HadoopFileSystem(
#     host='Cnt7-naya-cdh63', #or internal-ip
#     port=8020,
#     user='hdfs',
#     kerb_ticket=None,
#     extra_conf=None)

if fs.exists('/tmp/staging'):
    fs.delete('/tmp/staging', recursive=True)
fs.mkdir('/tmp/staging', recursive=True)



# Part 3
In this  step we would like to iterate the tables in
order to upload them as Parquet files, so in this step you should get the list of tables in the database.

- Use any RDBMS API to connect to the database. SQLAlchemy is a good option, but consider using a MySQL connector (like the official PyMySQL, which is already installed (explained better at W3Schools).
- Execute an SQL query to get all the names of the tables in the database, and then use the fetchall() method to turn the result into a list.

remember the file `hadoop_config.py` already contains all the definitions we need so do `from hadoop_config import cnx as mysql_cnx`

In [3]:
import pandas as pd
tables = pd.read_sql("SHOW TABLES", mysql_cnx)
tables

Unnamed: 0,Tables_in_audiostore
0,album
1,artist
2,customer
3,employee
4,genre
5,invoice
6,invoiceline
7,mediatype
8,playlist
9,playlisttrack


# Part 4

Import all the tables from the audiostore database into the staging folder using the Parquet format.


- There is going to be a for-loop on the list of tables. Each table will be converted to a Parquet file, which will be uploaded to our HDFS staging area.
- I couldn't find a direct option to make this transition, so I've added an intermediate staging step through pandas.DataFrame. Since pandas is so popular, it is easier to first read the tables as pandas.DataFrame (using read_sql()) and then write them as Parquet (using write_table()).
- Parquet does not support complex datetime types, so you might have to manipulate your data so that Parquet will "eat it".
- Parquet files have no standard encoding, so you should consider them as a stream of binary files.

these imports are useful
```
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
```


In [5]:
import pyarrow.parquet as pq
import pyarrow as pa

batch_size = 1000
for table in tables.squeeze().to_list():
    print(table)
    path_table = '/tmp/staging/' + table + '.parquet'
    with fs.open(path_table, "wb") as fw:
        for i, chunk_df in enumerate(pd.read_sql(f"SELECT * FROM {table}", mysql_cnx, chunksize=batch_size)):
            print(f"\t chunk {i}")
            df_for_hdfs = pa.Table.from_pandas(chunk_df)
            pq.write_table(df_for_hdfs, fw)
            

album
	 chunk 0
artist
	 chunk 0
customer
	 chunk 0
employee
	 chunk 0
genre
	 chunk 0
invoice
	 chunk 0
invoiceline
	 chunk 0
	 chunk 1
	 chunk 2
mediatype
	 chunk 0
playlist
	 chunk 0
playlisttrack
	 chunk 0
	 chunk 1
	 chunk 2
	 chunk 3
	 chunk 4
	 chunk 5
	 chunk 6
	 chunk 7
	 chunk 8
track
	 chunk 0
	 chunk 1
	 chunk 2
	 chunk 3


# Part 5
 In order for Impala to query these tables, one has to
define them as a database. Create a new database called my_audiostore at `/user/hive/warehouse/` and load the data
from the staging folder into it.

In [7]:
stg = '/tmp/staging/'
impala_path = '/user/hive/warehouse/audiostore/'
fs.chown(impala_path, group='naya', owner='impala')

impala_client.create_database('audiostore',path=impala_path,force=True)
impala_client.set_database('audiostore')
impala_client.raw_sql('INVALIDATE METADATA', results=False)
#INVALIDATE METADATA Statement
#https://impala.apache.org/docs/build/html/topics/impala_invalidate_metadata.html

tables = pd.read_sql("SHOW TABLES", mysql_cnx).squeeze().to_list()  # the mysql tables
for table in tables:
    print("starting ",table , end="...")
    #change dir owner
    fs.chown(stg, group='naya',owner='impala')
    
    #create tables in impala- only schema first
    pf = impala_client.parquet_file(hdfs_dir = f'{stg}', like_file = f'{stg}{table}.parquet')
    impala_client.create_table(table,
                        schema=pf.schema(),
                        database='audiostore',
                        external=True,
                        force=True,
                        format='parquet',
                        location=f'{impala_path}{table}.parquet',
                        like_parquet=None)

    impala_client.raw_sql('INVALIDATE METADATA', results=False)
    #5. Load the data into Impala
    impala_client.load_data(table_name =table,
                     path = f'{stg}{table}.parquet',
                     database='audiostore',
                     overwrite=True, partition=None
                    )
    print("finish")



In [8]:
tables

Unnamed: 0,Tables_in_audiostore
0,album
1,artist
2,customer
3,employee
4,genre
5,invoice
6,invoiceline
7,mediatype
8,playlist
9,playlisttrack
