Overall, this code defines a function to load data from a CSV file into a MySQL database table using PyArrow for data conversion and Parquet file format for intermediate storage. It also includes error handling to manage potential issues during the data loading process.

I'll explain the provided code line by line:

1. `import pandas as pd`: Imports the pandas library and aliases it as `pd` for easier use in the code.

2. `from sqlalchemy import create_engine`: Imports the `create_engine` function from the SQLAlchemy library, which is used to create a connection to a SQL database.

3. `from dotenv import load_dotenv`: Imports the `load_dotenv` function from the `dotenv` library, which is used to load environment variables from a .env file into the environment.

4. `import os`: Imports the `os` module, which provides a way to interact with the operating system, including accessing environment variables.

5. `import logging`: Imports the `logging` module, which allows for logging messages during the execution of the code.

6. `from sqlalchemy.exc import SQLAlchemyError`: Imports the `SQLAlchemyError` exception class from SQLAlchemy, which is used to handle errors related to database operations.

7. `import pyarrow.parquet as pq`: Imports the `pyarrow.parquet` module as `pq`, which provides functions for working with Parquet files.

8. `import pyarrow as pa`: Imports the `pyarrow` library and aliases it as `pa`, which is used for data interchange and serialization.

9. `load_dotenv(override=True, encoding='utf-16')`: Calls the `load_dotenv` function to load environment variables from a .env file. The `override=True` parameter indicates that existing environment variables should be overridden, and `encoding='utf-16'` specifies the encoding to use when reading the .env file.

10. `database_url = os.getenv('DATABASE_URL')`: Retrieves the value of the 'DATABASE_URL' environment variable using `os.getenv` and assigns it to the `database_url` variable. This variable presumably contains the URL for connecting to a MySQL database.

11. `def load_csv_to_mysql_parquet(file_path, table_name, database_url):`: Defines a function named `load_csv_to_mysql_parquet` that takes three parameters: `file_path` (path to the CSV file), `table_name` (name of the MySQL table to load the data into), and `database_url` (URL of the MySQL database).

12. `try:`: Begins a try-except block to handle potential exceptions that may occur during the execution of the code.

13. `engine = create_engine(database_url, pool_size=10, max_overflow=20)`: Creates a SQLAlchemy engine using the `create_engine` function, specifying the `database_url` as the connection URL, `pool_size=10` as the maximum number of connections in the connection pool, and `max_overflow=20` as the maximum number of connections that can be created beyond the `pool_size` limit.

14. `df = pd.read_csv(file_path)`: Uses pandas (`pd`) to read the CSV file specified by `file_path` into a pandas DataFrame (`df`).

15. `table = pa.Table.from_pandas(df)`: Converts the pandas DataFrame `df` into a PyArrow Table using `pa.Table.from_pandas`.

16. `pq.write_table(table, 'temp.parquet')`: Writes the PyArrow Table `table` to a Parquet file named 'temp.parquet' using `pq.write_table`.

17. `df_new = pd.read_parquet('temp.parquet')`: Reads the Parquet file 'temp.parquet' into a new pandas DataFrame `df_new` using `pd.read_parquet`.

18. `df_new.to_sql(table_name, con=engine, if_exists='append', chunksize=100000, index=False)`: Uses the `to_sql` function of the pandas DataFrame `df_new` to insert data into the MySQL table specified by `table_name`. The `con=engine` parameter specifies the SQLAlchemy engine for the connection, `if_exists='append'` appends the data if the table already exists, `chunksize=100000` specifies the number of rows to insert per batch, and `index=False` indicates not to include the DataFrame index as a column in the MySQL table.

19. `print(f"Data loaded successfully into the '{table_name}' table." )`: Prints a success message indicating that the data was loaded successfully into the specified MySQL table.

20. Various `except` blocks: These blocks handle specific exceptions that may occur during the execution of the code. They include `FileNotFoundError` for handling cases where the CSV file is not found, `pd.errors.EmptyDataError` for handling empty CSV files, `SQLAlchemyError` for handling SQLAlchemy-related errors, and a generic `Exception` block for handling any other unexpected errors. These blocks log appropriate error messages using the `logging` module.

21. `finally:`: Begins a block of code that will always be executed, regardless of whether an exception occurs or not.

22. `if 'engine' in locals(): engine.dispose()`: Checks if the `engine` variable is defined in the local scope (within the function) and disposes of the engine connection using `engine.dispose()` to release any resources held by the engine.

In [42]:
!pip3 install mysqlclient
!pip install python-dotenv
!pip install pymysql



In [43]:
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
import logging
from sqlalchemy.exc import SQLAlchemyError
import pyarrow.parquet as pq
import pyarrow as pa

# Access environment variables
load_dotenv(override=True, encoding='utf-16')

# Access environment variables
database_url = os.getenv('DATABASE_URL')

def load_csv_to_mysql_parquet(file_path, table_name, database_url):
    try: 
        # Create a connection to the MySQL database using the provided database URL
        engine = create_engine(database_url, pool_size=10, max_overflow=20)

        # Read the data from the csv file into a pandas dataframe
        df = pd.read_csv(file_path)

        # Convert the pandas dataframe to a PyArrow Table
        table = pa.Table.from_pandas(df)

        # Write the PyArrow Table to a Parquet file
        pq.write_table(table, 'temp.parquet')

        # Read the Parquet file into a new pandas dataframe
        df_new = pd.read_parquet('temp.parquet')

        # Use pandas to_sql function to load data from DataFrame into MySQL table
        df_new.to_sql(table_name, con=engine, if_exists='append', chunksize=100000, index=False)

        print(f"Data loaded successfully into the '{table_name}' table." )
        
    except FileNotFoundError as e:
        logging.error(f"Error: CSV file '{file_path}' not found. {e}")
    
    except pd.errors.EmptyDataError as e:
        logging.error(f"Error: CSV file '{file_path}' is empty. {e}")
        
    except SQLAlchemyError as e:
        logging.error(f"SQLAlchemy Error: {e}")
            
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
    
    finally:
        if 'engine' in locals():
            engine.dispose()

##### In this modified version of your function, pd.read_csv(file_path) returns a pandas DataFrame. The DataFrame is then converted to a PyArrow Table and written to a Parquet file. The Parquet file is read back into a new pandas DataFrame, which is then written to the MySQL table using the to_sql method.

##### Please note that this approach involves writing and reading a temporary Parquet file (‘temp.parquet’), so make sure you have enough disk space.

#### Load users data into the Users Table

In [44]:
#Usage: Calling the function
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/users.csv"
table_name = "users"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'users' table.


**Load users data into the Events Table**

In [45]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/events.csv"
table_name = "events"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":
        load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'events' table.


**Load users data into the Distribution_Centers Table**

In [46]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/distribution_centers.csv"
table_name = "distribution_centers"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'distribution_centers' table.


**Load users data into the Orders Table**

In [47]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/orders.csv"
table_name = "orders"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":    
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'orders' table.


**Load users data into the Products Table**

In [48]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/products.csv"
table_name = "products"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":    
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'products' table.


**Load users data into the Inventory_Items Table**

In [49]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/inventory_items.csv"
table_name = "inventory_items"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":    
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'inventory_items' table.


**Load users data into the Order_Items Table**

In [50]:
file_path = "C:/Users/Bookie/Documents/Dufuna_Documentation/CapstoneProject/data_tables/order_items.csv"
table_name = "order_items"
database_url = os.getenv('DATABASE_URL')

if __name__=="__main__":    
    load_csv_to_mysql_parquet(file_path, table_name, database_url)

Data loaded successfully into the 'order_items' table.
