In [1]:
pip install mysql-connector-python pandas pandas-gbq

Note: you may need to restart the kernel to use updated packages.


In [2]:
import mysql.connector as connection
import pandas as pd

def data_pipeline_mysql_to_bq(**kwargs):

    mysql_host = kwargs.get('mysql_host')
    mysql_database = kwargs.get('mysql_database')
    mysql_user = kwargs.get('mysql_user')
    mysql_password = kwargs.get('mysql_password')
    bq_project_id = kwargs.get('bq_project_id')
    dataset = kwargs.get('dataset')

    try:
        mydb = connection.connect(host=mysql_host\
                                , database = mysql_database\
                                , user=mysql_user\
                                , passwd=mysql_password\
                                ,use_pure=True)

        all_tables = "Select table_name from information_schema.tables where table_schema = '{}'".format(mysql_database)
        df_tables = pd.read_sql(all_tables,mydb,
                   parse_dates={'Date': {'format': '%Y-%m-%d'}})

        for table in df_tables.TABLE_NAME:
            table_name = table

            # Extract table data from MySQL
            df_table_data = extract_table_from_mysql(table_name, mydb)

            # Transform table data from MySQL
            df_table_data = transform_data_from_table(df_table_data)

            # Load data to BigQuery
            load_data_into_bigquery(bq_project_id,
                                  dataset,table_name,df_table_data)

            # Show confirmation message
            print("Ingested table {}".format(table_name))

        mydb.close() #close the connection
    except Exception as e:
        mydb.close()
        print(str(e))

In [3]:
'''
    Simulate the extraction step in an ETL job
'''
def extract_table_from_mysql(table_name, my_sql_connection):
    # Extract data from mysql table
    extraction_query = 'select * from ' + table_name
    df_table_data = pd.read_sql(extraction_query,my_sql_connection)
    return df_table_data

In [4]:
'''
    Simulate the transformation step in an ETL job
'''
def transform_data_from_table(df_table_data):
    # Clean dates - convert to string
    object_cols = df_table_data.select_dtypes(include=['object']).columns
    for column in object_cols:
        dtype = str(type(df_table_data[column].values[0]))
        if dtype == "<class 'datetime.date'>":
            df_table_data[column] = df_table_data[column].map(lambda x: str(x))
    return df_table_data

In [5]:
'''
    Simulate the load step in an ETL job
'''
def load_data_into_bigquery(bq_project_id, dataset,table_name,df_table_data):
    import pandas_gbq as pdbq
    full_table_name_bg = "{}.{}".format(dataset,table_name)
    pdbq.to_gbq(df_table_data,full_table_name_bg,project_id=bq_project_id,
      if_exists='replace')

In [6]:
# Call main function

kwargs = {
    # BigQuery connection details
    'bq_project_id': 'dbt-analytics-engineer-458723',
    'dataset': 'omnichannel_raw',
    # MySQL connection details
    'mysql_host': 'localhost',
    'mysql_user': 'root',
    'mysql_password': 'efishery01',
    'mysql_database': 'OMNI_MANAGEMENT'
}

data_pipeline_mysql_to_bq(**kwargs)

  df_tables = pd.read_sql(all_tables,mydb,
  df_table_data = pd.read_sql(extraction_query,my_sql_connection)


Ingested table channels


  df_table_data = pd.read_sql(extraction_query,my_sql_connection)


Ingested table customers


  df_table_data = pd.read_sql(extraction_query,my_sql_connection)


Ingested table products


  df_table_data = pd.read_sql(extraction_query,my_sql_connection)


Ingested table purchaseHistory


  df_table_data = pd.read_sql(extraction_query,my_sql_connection)


Ingested table visitHistory


In [None]:
## CHECK MYSQL local connection

# import mysql.connector as connection
# import pandas as pd

# def data_pipeline_mysql_to_bq(**kwargs):
#     mysql_host = kwargs.get('mysql_host')
#     mysql_database = kwargs.get('mysql_database')
#     mysql_user = kwargs.get('mysql_user')
#     mysql_password = kwargs.get('mysql_password')
#     bq_project_id = kwargs.get('bq_project_id')
#     dataset = kwargs.get('dataset')

#     try:
#         print("Connecting to MySQL database...")
#         mydb = connection.connect(host=mysql_host,
#                                 database=mysql_database,
#                                 user=mysql_user,
#                                 passwd=mysql_password,
#                                 use_pure=True)
#         print("Connected successfully!")

#         all_tables = "Select table_name from information_schema.tables where table_schema = '{}'".format(mysql_database)
#         print(f"Executing query: {all_tables}")
        
#         df_tables = pd.read_sql(all_tables, mydb, 
#                              parse_dates={'Date': {'format': '%Y-%m-%d'}})
        
#         # Add diagnostic information
#         print("Tables found:", df_tables.shape[0])
#         print("Column names:", df_tables.columns.tolist())
        
#         # Check if the DataFrame has any rows
#         if df_tables.empty:
#             print("No tables found in the database")
#             mydb.close()
#             return
            
#         # Determine the correct column name
#         table_column = None
#         if 'table_name' in df_tables.columns:
#             table_column = 'table_name'
#             print("Using column 'table_name'")
#         elif 'TABLE_NAME' in df_tables.columns:
#             table_column = 'TABLE_NAME'
#             print("Using column 'TABLE_NAME'")
#         else:
#             print("Unable to find table name column. Available columns:", df_tables.columns.tolist())
#             mydb.close()
#             return
        
#         # Print first few tables for verification
#         print("First few tables:", df_tables[table_column].head().tolist())

#         for table in df_tables[table_column]:
#             table_name = table
#             print(f"\nProcessing table: {table_name}")

#             # Extract table data from MySQL
#             df_table_data = extract_table_from_mysql(table_name, mydb)
            
#             # Add diagnostic info for the table data
#             print(f"Extracted data shape: {df_table_data.shape}")
#             if df_table_data.empty:
#                 print(f"Table {table_name} is empty, skipping")
#                 continue

#             # Transform table data from MySQL
#             df_table_data = transform_data_from_table(df_table_data)

#             # Load data to BigQuery
#             load_data_into_bigquery(bq_project_id, dataset, table_name, df_table_data)

#             # Show confirmation message
#             print("Ingested table {}".format(table_name))

#         mydb.close()  # close the connection
#         print("MySQL connection closed.")
#     except Exception as e:
#         if 'mydb' in locals() and mydb.is_connected():
#             mydb.close()
#             print("MySQL connection closed due to error.")
#         print("Error:", str(e))


# def extract_table_from_mysql(table_name, my_sql_connection):
#     # Extract data from mysql table
#     print(f"Extracting data from table: {table_name}")
#     extraction_query = 'select * from ' + table_name
#     try:
#         df_table_data = pd.read_sql(extraction_query, my_sql_connection)
#         return df_table_data
#     except Exception as e:
#         print(f"Error extracting table {table_name}: {str(e)}")
#         return pd.DataFrame()  # Return empty DataFrame on error


# def transform_data_from_table(df_table_data):
#     # Clean dates - convert to string
#     print("Transforming data...")
#     try:
#         object_cols = df_table_data.select_dtypes(include=['object']).columns
#         print(f"Object columns to process: {len(object_cols)}")
        
#         for column in object_cols:
#             # Skip empty columns
#             if df_table_data[column].empty:
#                 print(f"Column {column} is empty, skipping")
#                 continue
                
#             # Check if the column has any non-null values
#             non_null_values = df_table_data[column].dropna()
#             if len(non_null_values) == 0:
#                 print(f"Column {column} has only null values, skipping")
#                 continue
                
#             # Safely get the first non-null value
#             first_value = non_null_values.iloc[0]
#             dtype = str(type(first_value))
#             print(f"Column {column}, first value type: {dtype}")
            
#             if dtype == "<class 'datetime.date'>":
#                 print(f"Converting column {column} from date to string")
#                 df_table_data[column] = df_table_data[column].map(lambda x: str(x) if x is not None else None)
                
#         return df_table_data
#     except Exception as e:
#         print(f"Error in transform: {str(e)}")
#         return df_table_data  # Return the original DataFrame if there's an error


# def load_data_into_bigquery(bq_project_id, dataset, table_name, df_table_data):
#     # Load data to BigQuery
#     print(f"Loading data to BigQuery: {dataset}.{table_name}")
#     try:
#         import pandas_gbq as pdbq
#         full_table_name_bg = "{}.{}".format(dataset, table_name)
#         pdbq.to_gbq(df_table_data, full_table_name_bg, project_id=bq_project_id, if_exists='replace')
#         print(f"Loaded {len(df_table_data)} rows to BigQuery")
#         return True
#     except Exception as e:
#         print(f"Error loading to BigQuery: {str(e)}")
#         return False


# # Call main function
# if __name__ == "__main__":
#     kwargs = {
#         # BigQuery connection details
#         'bq_project_id': 'dbt-analytics-engineer-458723',
#         'dataset': 'omnichannel_raw',
#         # MySQL connection details
#         'mysql_host': 'localhost',
#         'mysql_user': 'root',
#         'mysql_password': 'efishery01',
#         'mysql_database': 'OMNI_MANAGEMENT'
#     }
    
#     data_pipeline_mysql_to_bq(**kwargs)