# Don't repeat yourself (dry)

> Elias Castellanos Alamilla

> April-2023

In [1]:
from sqlalchemy import create_engine, text
import pandas as pd
import pymysql

import numpy as np

from pylab import rcParams
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib import cm
import matplotlib
import seaborn as sns

from statsmodels.tsa.seasonal import seasonal_decompose

from sklearn.metrics import silhouette_samples
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
import scipy.cluster.hierarchy as sch

from io import StringIO

import requests
import json
import urllib

## Data manipulation

In [2]:
def send_sql_data(db_connection: dict, df: pd.DataFrame, name: str):
    """
    Save data gathering into mySQL database.
    Args: 
        - db_connection: database connection credentials. (host, user, password, database)
        - df: pandas DataFrame.
        - name: new table name.
    Returns:
        - None
    """
    try:
        engine = create_engine(f"mysql+pymysql://{db_connection['user']}:{db_connection['password']}@{db_connection['host']}/{db_connection['db']}")

        df.to_sql(name, engine, if_exists='fail')

        print("Data send succesfully!")
    except Exception as e:
        print(f"Error sending data to SQL: {e}")

In [3]:
# Define the compare_and_aggregate function
def compare_and_aggregate(local_data, db_data_query, db_connection, table_name):
    """
    Compares local data with database data and aggregates new data to a MySQL table.

    Args:
        local_data (pandas DataFrame): Local data to compare.
        db_data_query (str): SQL query to retrieve database data.
        db_connection (dict): Dictionary containing MySQL database connection details.

    Returns:
        None
    """
    # Check if local_data is a Pandas DataFrame
    if not isinstance(local_data, pd.DataFrame):
        raise ValueError("local_data must be a Pandas DataFrame")
        
    # Check if local_data has at least one row
    if local_data.empty:
        print("local_data is empty, no comparison or aggregation, check your data.")
        return None
    
    # Replace 'NaT' values with None
    local_data = local_data.fillna(method='ffill')
    
    
    # Connect to MySQL database
    conn = pymysql.connect(
        host=db_connection['host'],
        user=db_connection['user'],
        password=db_connection['password'],
        db=db_connection['db'],
        cursorclass=pymysql.cursors.DictCursor
    )

    try:
        # Retrieve data from MySQL table
        with conn.cursor() as cursor:
            cursor.execute(db_data_query)
            result = cursor.fetchall()

        # Convert MySQL data to DataFrame
        db_df = pd.DataFrame(result)
        
        # Check if local_data has the same number of columns as db_df
        if local_data.shape[1] != db_df.shape[1]:
            raise ValueError("local_data has a different number of columns compared to database data")

        # Compare local data with database data
        common_cols = list(set(local_data.columns) & set(db_df.columns))
        new_data = local_data[~local_data[common_cols].isin(db_df[common_cols].values.ravel()).all(axis=1)]

        # Aggregate new data to MySQL table
        if not new_data.empty:
            with conn.cursor() as cursor:
                for _, row in new_data.iterrows():
                    row['MME'] = round(row['MME'], 2)
                    cols = ', '.join(new_data.columns)
                    vals = ', '.join(['%s'] * len(new_data.columns))
                    query = f"INSERT INTO {table_name} ({cols}) VALUES ({vals})"
                    cursor.execute(query, tuple(row))
                conn.commit()
            print(f"{len(new_data)} rows of new data added to MySQL table.")
        else:
            print("No new data to add to MySQL table.")
    finally:
        # Close database connection
        conn.close()

In [4]:
def read_sql_data(db_connection: dict, tablename: str):
    """
    Load data from mySQL database.
    Args: 
        - db_connection: database connection credentials. (host, user, password, database)
        - tablename: table name to query.
    Returns:
        - pandas DataFrame.
    """
    engine = create_engine(f"mysql+pymysql://{db_connection['user']}:{db_connection['password']}@{db_connection['host']}/{db_connection['db']}")
    query = f'SELECT * FROM {tablename}'
    df = pd.read_sql_query(text(query), con=engine.connect())
    
    return df

## Web Scraping

In [5]:
def download_bmx_timeseries(token, series, start_date: str):
    """
    Web Scraping function to get time series data from Banxico repository.
    :param serie: IDSerie, ej. SF63528 (str). https://www.banxico.org.mx/SieAPIRest/service/v1/
    :param start_date: date format yyyy-mm-dd.
    :param end_date: date format yyyy-mm-dd.
    :param token: API token Banxico, length 64 characteres. https://www.banxico.org.mx/SieAPIRest/service/v1/token
    :return: Pandas DataFrame.
    """
    end_date = pd.to_datetime('today', format='%Y-%m-%d')
    date_range = pd.date_range(start=start_date, end=end_date, freq='MS')
    url = 'https://www.banxico.org.mx/SieAPIRest/service/v1/series/'+series+'/datos/'+start_date+'/'+str(date_range[-1].date())+''
    print(url)
    headers = {'Bmx-Token':token}
    response = requests.get(url, headers=headers)
    status = response.status_code
    if status != 200:
        return print("Error, status code: {}".format(status))
    raw_data = response.json()
    data = raw_data['bmx']['series'][0]['datos']
    df = pd.DataFrame(data)
    df.replace(to_replace='N/E', value='NaN', inplace=True)
    df['dato'] = df['dato'].apply(lambda x:float(x))
    df['fecha'] = pd.to_datetime(df['fecha'], format='%d/%m/%Y')
    df.set_index('fecha', inplace=True)
    
    return df

In [6]:
def get_mme_prices():
    """
    Only web scrape oil & gas mx prices, no token require.
    Args:
        - No args.
    Return: 
        - pandas DataFrame.
    """
    # Precios MME Banxico
    url = "https://www.banxico.org.mx/SieInternet/consultaSerieGrafica.do?s=SI744,CI38"
    f = urllib.request.urlopen(url)
    myfile = f.read()

    # Get string data format
    string_data = str(myfile, 'utf-8')

    # Convert string to json format
    json_data = json.loads(string_data)

    # Tabular data
    df = pd.DataFrame(json_data['valores'], columns=['fecha', 'MME'])

    # replace values "-989898.00" related with NaN values 
    array = np.where(np.isclose(df['MME'].values, -989898.00), np.nan, df['MME'].values)

    # Transform to pandas DataFrame
    mme_clean = pd.DataFrame(array, columns=['MME'])
    mme_clean

    # Insert NaN values
    df['MME'] = mme_clean

    # Set datetimeindex
    df['fecha'] = pd.to_datetime(df['fecha'], format='%Y-%m-%d')
    df.set_index('fecha', inplace=True)
    
    return df

https://stackoverflow.com/questions/53426787/how-to-replace-a-float-value-with-nan-in-pandas
https://stackoverflow.com/questions/15138614/how-can-i-read-the-contents-of-an-url-with-python

# Data Viz

In [7]:
def highlight_cells(val):
    """
    Conditonal formatting based on correlation values.
    Args:
        - val: values to compare.
    Returns:
        - format color
    """
    color = 'yellow' if (val >= 0.6 or val <= -0.6) else '#C6E2E9' # Pastel blue
    
    return 'background-color: {}'.format(color)

In [8]:
# Matplotlib colors table 
# https://matplotlib.org/stable/gallery/color/named_colors.html

def plot_colortable(colors, sort_colors=True, emptycols=0):
    """
    Function to visualize the available colors in matplotlib.
    :param colors: CSS matplotlib colors (matplotlib.colors.CSS4_COLORS)
    :return: Matplotlib figure with available colors based on CSS.
    """

    cell_width = 212
    cell_height = 22
    swatch_width = 48
    margin = 12

    # Sort colors by hue, saturation, value and name.
    if sort_colors is True:
        by_hsv = sorted((tuple(mcolors.rgb_to_hsv(mcolors.to_rgb(color))),
                         name)
                        for name, color in colors.items())
        names = [name for hsv, name in by_hsv]
    else:
        names = list(colors)

    n = len(names)
    ncols = 4 - emptycols
    nrows = n // ncols + int(n % ncols > 0)

    width = cell_width * 4 + 2 * margin
    height = cell_height * nrows + 2 * margin
    dpi = 72

    fig, ax = plt.subplots(figsize=(width / dpi, height / dpi), dpi=dpi)
    fig.subplots_adjust(margin/width, margin/height,
                        (width-margin)/width, (height-margin)/height)
    ax.set_xlim(0, cell_width * 4)
    ax.set_ylim(cell_height * (nrows-0.5), -cell_height/2.)
    ax.yaxis.set_visible(False)
    ax.xaxis.set_visible(False)
    ax.set_axis_off()

    for i, name in enumerate(names):
        row = i % nrows
        col = i // nrows
        y = row * cell_height

        swatch_start_x = cell_width * col
        text_pos_x = cell_width * col + swatch_width + 7

        ax.text(text_pos_x, y, name, fontsize=14,
                horizontalalignment='left',
                verticalalignment='center')

        ax.add_patch(
            Rectangle(xy=(swatch_start_x, y-9), width=swatch_width,
                      height=18, facecolor=colors[name], edgecolor='0.7')
        )

    return fig

In [9]:
def plot_correlation(color, title, df={}):
    """
    Function to plot pearson correlation gradients between variables.
    :param color: cmap heatmap option. i.e. "Blues", "coolwarm", "jet", etc.
    :param df: Pandas DataFrame.
    :returns: matplotlib figure.
    """
    # calculate pearson correlation
    corr = df.corr()
    
    # mask for mask heatmap parameter
    mask = np.triu(np.ones_like(corr, dtype=bool))
    
    # create figure
    fig = plt.subplots(figsize=(14, 8))
    
    # create plot
    sns.heatmap(corr, mask=mask, annot=True, cmap=color, linewidths=1, center=0)
    plt.title(title)
    plt.tight_layout()
    
    return fig

In [10]:
def seasonal_decomposition(df, select_period, model='additive'):
    """
    Decompose a timeseries dataset into trend, seasonal, residuals, and plot it.
    :param df: Timeseries data in Pandas DataFrame format. It's optional.
    :param model: seasonal decompose model, i.e., additive
    :param period: period for seasonal decompose, i.e., 12 means monthly seasonal, 3 means quarter
    :return: plot figure seasonal decompose (matplotlib figure).
    """
    
    matplotlib.rcParams['axes.labelsize'] = 14
    matplotlib.rcParams['xtick.labelsize'] = 12
    matplotlib.rcParams['ytick.labelsize'] = 12
    matplotlib.rcParams['text.color'] = 'k'
    
    rcParams['figure.figsize'] = 18, 8
    decomposition = seasonal_decompose(x=df, model=model, period=select_period)
    fig = decomposition.plot()
    
    plt.tight_layout()
    
    return fig

In [11]:
def pca_data_preprocessing(df={}):
    """Data preprocessing for PCA, Standard Scaler.
    
    Args:
        df(DataFrame): Numeric information in original scale.
        new_object(Standard_df): Pandas DataFrame with data preprocessing by Standard Scaler
        
    Returns:
        Pandas DataFrame with preprocessing values
    """
    ss = StandardScaler()
    ss.fit(df)
    data_std = ss.transform(df)
    
    df_std = pd.DataFrame(data_std, columns=df.columns)
    
    return df_std

In [12]:
def pca_pipeline_viz(df, xlabel: str, ylabel: str, title: str):
    """
    Plot PCA elbow method. 
    Visual tool that helps to choose the number of principal components for PCA.
    :param df: Pandas DataFrame
    :param xlabel: X label legend for plot xaxis.
    :param ylabel: Y label legend for plot yaxis.
    :param title: Legend for plot title.
    :return: Matplotlib figure to visualize PCA elbow method.
    """
    
    pipe = Pipeline([
        ('scaler', StandardScaler()), 
        ('dr', PCA())
    ])
    
    pipe.fit(df)
    
    var = pipe.steps[1][1].explained_variance_ratio_.cumsum()
    
    fig, ax = plt.subplots(figsize=(12, 10))
    plt.plot(var, marker='o')
    
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.set_title(title)
    plt.show()
    
    return var, fig

In [13]:
def plot_dendrogram(df, xlabel: str, ylabel: str, title: str, y_top: float, y_base: float):
    """Hieriarchical clustering dendrogram figure.
    Visual tool that helps to choose the number of cluster for an unsupervised model.
    :param df: Pandas DataFrame
    :param xlabel: X label legend for plot xaxis.
    :param ylabel: Y label legend for plot yaxis.
    :param title: Legend for plot title.
    :return: Matplotlib figure to visualize Dendrogram method for clustering models.
    """
    
    fig, ax = plt.subplots(figsize=(15, 10))
    dend = sch.dendrogram(sch.linkage(df, method='ward'))
    
    ax.axhline(y=y_top, c='grey', lw=1, linestyle='dashed')
    ax.axhline(y=y_base, c='grey', lw=1, linestyle='dashed')
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.set_xticks([])
    plt.show()
    
    return fig

In [14]:
def plot_silhoutte_coeff(array, df, xlabel: str, ylabel: str, title: str): 
    """Silhouette coefficient helps to check the quality of our predicted clusters.
    Y_label = Cluster.
    X_label = Silhouette coefficient.
    Title = Silhouette coefficient plot.
    :param df: Pandas DataFrame
    :param xlabel: X label legend for plot xaxis.
    :param ylabel: Y label legend for plot yaxis.
    :param title: Legend for plot title.
    :return: Matplotlib figure to visualize Silhouette coefficient to measure the quality of clustering results
    """
    cluster_labels = np.unique(array)
    n_clusters = cluster_labels.shape[0]
    silhouette_vals = silhouette_samples(df, array, metric='euclidean')
    y_ax_lower, y_ax_upper = 0,0
    yticks = []
    fig = plt.subplots(figsize=(15,10))
    for i, c in enumerate (cluster_labels):
        c_silhouette_vals  = silhouette_vals[array==c]
        c_silhouette_vals.sort()
        y_ax_upper += len(c_silhouette_vals)
        color = cm.jet(float(i)/n_clusters)
        plt.barh(range(y_ax_lower, y_ax_upper), c_silhouette_vals,height=1, edgecolor='none', color=color)
        yticks.append((y_ax_lower + y_ax_upper)/2.)
        y_ax_lower += len(c_silhouette_vals)
    silhouette_avg = np.mean(silhouette_vals)
    plt.axvline(silhouette_avg, color='red', linestyle="--")
    plt.yticks(yticks, cluster_labels + 1)

    plt.ylabel(ylabel)
    plt.xlabel(xlabel)
    plt.title(title)
    
    plt.show()
    
    return fig

# Data Preprocess

In [15]:
def split_data(df, feature: str, year_train: str, year_test: str):
    """
    Split data into train and test set for SARIMAX model, make sure to set_index in DatetimeIndex format.
    :param df: pandas DataFrame.
    :param feature: Feature to split. 
    :param year_train: Year for training data, i.e. '2014'
    :param year_test: Year for testing data, i.e. '2015'
    :returns: Split train and test DataFrame.
    """
    
    train = df[[feature]].loc[:year_train]
    test = df[[feature]].loc[year_test:]
    
    return train, test