In [1]:
# Import Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt
from factor_analyzer import FactorAnalyzer
from factor_analyzer.factor_analyzer import calculate_bartlett_sphericity
from factor_analyzer.factor_analyzer import calculate_kmo
import pingouin as pg
import seaborn as sns
import numpy as np
import statsmodels.api as sm # biblioteca de modelagem estatística
from statsmodels.iolib.summary2 import summary_col # comparação entre modelos
from scipy.stats import pearsonr # correlações de Pearson

In [2]:
# Create a Spark Session
spark = SparkSession.builder.appName("EDA").getOrCreate()

In [3]:
# Load the Data
pandasdf = pd.read_excel('INMET_SE_SP_A771_2022.xlsx')
df = spark.createDataFrame(pandasdf)

In [4]:
# Basic Data Exploration
print('Columns overview')
pd.DataFrame(df.dtypes, columns = ['Column Name','Data type'])

Columns overview


Unnamed: 0,Column Name,Data type
0,Data,timestamp
1,Hora UTC,string
2,PRECIPITAÇÃO TOTAL HORÁRIO (mm),double
3,PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO HORARI...,double
4,PRESSÃO ATMOSFERICA MAX NA HORA ANT (AUT) (mB),double
5,PRESSÃO ATMOSFERICA MIN NA HORA ANT (AUT) (mB),double
6,RADIACAO GLOBAL (Kj/m²),double
7,TEMPERATURA DO AR - BULBO SECO HORARIA (°C),double
8,TEMPERATURA DO PONTO DE ORVALHO (°C),double
9,TEMPERATURA MÁXIMA NA HORA ANT (AUT) (°C),double


In [None]:
# Finding the missing values

# Get the column names
column_names = df.columns

# Get the numeric_columns
numeric_columns = column_names[2:]

missing_values = {} 
for index, column in enumerate(df.columns):
    if column in numeric_columns:  # check zeroes, None, NaN
        missing_count = df.where(col(column).isin([0,None,np.nan])).count()
        missing_values.update({column:missing_count})

missing_df = pd.DataFrame.from_dict([missing_values])
missing_df

In [None]:
# Remove rows with missing values
df = df.dropna()

In [None]:
# Drop the 'Hora UTC' column
df = df.drop("Hora UTC")

In [None]:
# Run describe() on the PySpark DataFrame
describe_result = df.describe()

# Convert the result to a Pandas DataFrame with custom column names
pandas_df_describe = describe_result.toPandas().rename(columns={'summary': 'Column Name', 'col1': 'Data type'})

# Display the Pandas DataFrame
pandas_df_describe


In [None]:
# Avarage values for each month
from pyspark.sql.functions import month, year, avg

# Convert the 'Data' column to a date type if it's not already
df = df.withColumn("Data", df["Data"].cast("date"))

# Extract the month and year from the 'Data' column
df = df.withColumn("Month", month(df["Data"]))
df = df.withColumn("Year", year(df["Data"]))

# Get the numeric_columns
numeric_columns = [column for column in df.columns if column != "Data" and column != "Month" and column != "Year"]

# Create a list to store the aggregation expressions
agg_exprs = [avg(column).alias(f"Avg_{column}") for column in numeric_columns]

# Group by year and month, and calculate the average of numeric columns
grouped_df = df.groupBy("Year", "Month").agg(*agg_exprs)

# Show the resulting DataFrame
grouped_df_pd = grouped_df.toPandas()

grouped_df_pd

In [None]:
# Data Vizualisation: Temperature through 2022

# Select the relevant columns
temp_date_df = df.select("Data", "TEMPERATURA MÁXIMA NA HORA ANT (AUT) (°C)", "TEMPERATURA MÍNIMA NA HORA ANT (AUT) (°C)")

# Convert to Pandas DataFrame
temp_date_df_pd = temp_date_df.toPandas()

# Create the chart
plt.figure(figsize=(16, 10))

plt.plot(temp_date_df_pd["Data"], temp_date_df_pd["TEMPERATURA MÁXIMA NA HORA ANT (AUT) (°C)"], label="Max Temperature (°C)", color="red")
plt.plot(temp_date_df_pd["Data"], temp_date_df_pd["TEMPERATURA MÍNIMA NA HORA ANT (AUT) (°C)"], label="Min Temperature (°C)", color="blue")

plt.xlabel("Date")
plt.ylabel("Temperature (°C)")
plt.title("Temperature Over the Year")
plt.legend()

plt.xticks(rotation=45)
plt.tight_layout()

plt.show()

In [None]:
# Applying PCA to the dataframe filtered by the month of August

# Filter the rows where the month is 8
august_df = df.filter(month("Data") == 8)

# Show the resulting DataFrame
august_df_pd = august_df.toPandas()

august_df_pd.head()

In [None]:
# Get the number of rows
num_rows = august_df.count()

# Get the list of column names
column_names = august_df.columns

# Display the dimensions
print(f"Number of Rows: {num_rows}")
print(f"Number of Columns: {len(column_names)}")

In [None]:
column_names

In [None]:
# List the columns you want to calculate the averages for
columns_to_average = [
    ('PRESSÃO ATMOSFERICA MAX NA HORA ANT  (AUT) (mB)', 'PRESSÃO ATMOSFERICA MIN NA HORA ANT (AUT) (mB)'),
    ('TEMPERATURA MÁXIMA NA HORA ANT (AUT) (°C)', 'TEMPERATURA MÍNIMA NA HORA ANT (AUT) (°C)'),
    ('TEMPERATURA ORVALHO MAX NA HORA ANT (AUT) (°C)', 'TEMPERATURA ORVALHO MIN NA HORA ANT (AUT) (°C)'),
    ('UMIDADE REL MAX NA HORA ANT (AUT) (%)', 'UMIDADE REL MIN NA HORA ANT (AUT) (%)'),
    ('VENTO RAJADA MAXIMA (m/s)', 'VENTO VELOCIDADE HORARIA (m/s)')
]

# Create a new DataFrame to store the averages
aug_avg_df_pd = pd.DataFrame()

# Calculate and store the averages for each column pair
for col1, col2 in columns_to_average:
    average_col_name = f'AVERAGE_{col1}_{col2}'  # Create a new column name
    aug_avg_df_pd[average_col_name] = (august_df_pd[col1] + august_df_pd[col2]) / 2

# Select the columns you want to keep from the original DataFrame
columns_to_keep = ['PRECIPITAÇÃO TOTAL HORÁRIO (mm)',
                   'RADIACAO GLOBAL (Kj/m²)']

# Select those columns from the original DataFrame
selected_columns = august_df_pd[columns_to_keep]

# Concatenate the selected columns with the DataFrame containing averages
aug_df_pd = pd.concat([selected_columns, aug_avg_df_pd], axis=1)

# Show de df
aug_df_pd.head()

In [None]:
# Performing PCA 

# Matrix of correlations between variables
matriz_corr = pg.rcorr(aug_df_pd, method = 'pearson', upper = 'pval', decimals = 4, pval_stars = {0.01: '***', 0.05: '**', 0.10: '*'})
matriz_corr

In [None]:
# Another way to plot the same information

corr = aug_df_pd.corr()

f, ax = plt.subplots(figsize=(11, 9))

mask = np.triu(np.ones_like(corr, dtype=bool))

cmap = sns.diverging_palette(230, 20, n=256, as_cmap=True)

sns.heatmap(aug_df_pd.corr(), 
            mask=mask, 
            cmap=cmap, 
            vmax=1, 
            vmin = -.25,
            center=0,
            square=True, 
            linewidths=.5,
            annot = True,
            fmt='.3f', 
            annot_kws={'size': 16},
            cbar_kws={"shrink": .75})

plt.title('Matriz de correlação')
plt.tight_layout()
ax.tick_params(axis = 'x', labelsize = 14)
ax.tick_params(axis = 'y', labelsize = 14)
ax.set_ylim(len(corr))

plt.show()

In [None]:
# Bartlett test
bartlett, p_value = calculate_bartlett_sphericity(aug_df_pd)
print(f'Bartlett statistic: {bartlett}')
print(f'p-value : {p_value}')

# KMO Statisticskmo_all
kmo_model = calculate_kmo(aug_df_pd)
print(f'kmo_model : {kmo_model}')

In [None]:
# Defining the PCA (preliminary procedure)
fa = FactorAnalyzer()
fa.fit(aug_df_pd)

# Getting the Eigenvalues ​​(eigenvalues)
ev, v = fa.get_eigenvalues()
print(ev)

In [None]:
# Kaiser Criterion

# Check eigenvalues ​​with values ​​greater than 1
# There are three components above 1

# Parameterizing the PCA for two factors (eigenvalues ​​> 1)
fa.set_params(n_factors = 3, method = 'principal', rotation = None)
fa.fit(aug_df_pd)

# Eigenvalues, variances and accumulated variances
eigen_fatores = fa.get_factor_variance()
eigen_fatores

eigen_table = pd.DataFrame(eigen_fatores)
eigen_table.columns = [f"Fator {i+1}" for i, v in enumerate(eigen_table.columns)]
eigen_table.index = ['Eigenvalue','Variance', 'Accumulated Variance']
eigen_table = eigen_table.T

eigen_table

In [None]:
# Determining factor loadings
factor_loads = fa.loadings_

load_table = pd.DataFrame(factor_loads)
load_table.columns = [f"Fator {i+1}" for i, v in enumerate(load_table.columns)]
load_table.index = aug_df_pd.columns
load_table

In [None]:
# Determining commonalities
commonalities = fa.get_communalities()

commonalities_table = pd.DataFrame(commonalities)
commonalities_table.columns = ['Commonalities']
commonalities_table.index = aug_df_pd.columns
commonalities_table

In [None]:
# Factor results for the dataset observations (predict)
predict_fatores= pd.DataFrame(fa.transform(aug_df_pd))
predict_fatores.columns =  [f"Fator {i+1}" for i, v in enumerate(predict_fatores.columns)]

predict_fatores

In [None]:
# Adding to dataset
aug_df_pd_pf = pd.concat([aug_df_pd.reset_index(drop=True), predict_fatores], axis=1)

In [None]:
# Identifying factor scores
scores = fa.weights_

table_scores = pd.DataFrame(scores)
table_scores.columns = [f"Fator {i+1}" for i, v in enumerate(table_scores.columns)]
table_scores.index = aug_df_pd.columns
table_scores

In [None]:
# Correlation between factors

# Next, it appears that the correlation between the factors is zero (orthogonal)
corr_fator = pg.rcorr(aug_df_pd_pf[['Fator 1','Fator 2', 'Fator 3']], method = 'pearson', upper = 'pval', decimals = 4, pval_stars = {0.01: '***', 0.05: '**', 0.10: '*'})
corr_fator

In [None]:
# Creating a ranking
aug_df_pd_pf['Ranking'] = 0

for index, item in enumerate(list(eigen_table.index)):
    variancia = eigen_table.loc[item]['Variance']

    aug_df_pd_pf['Ranking'] = aug_df_pd_pf['Ranking'] + aug_df_pd_pf[eigen_table.index[index]]*variancia
    
aug_df_pd_pf.head()

In [None]:
# Chart of factor loadings and their variances in the main components
plt.figure(figsize=(12,8))

table_loads_chart = load_table.reset_index()

plt.scatter(table_loads_chart['Fator 1'], table_loads_chart['Fator 2'], s=30)

def label_point(x, y, val, ax):
    a = pd.concat({'x': x, 'y': y, 'val': val}, axis=1)
    for i, point in a.iterrows():
        ax.text(point['x'] + 0.05, point['y'], point['val'])

label_point(x = table_loads_chart['Fator 1'],
            y = table_loads_chart['Fator 2'],
            val = table_loads_chart['index'],
            ax = plt.gca()) 

plt.axhline(y=0, color='black', ls='--')
plt.axvline(x=0, color='black', ls='--')
plt.ylim([-1.5,1.5])
plt.xlim([-1.5,1.5])
plt.title(f"{eigen_table.shape[0]} main components that explain {(eigen_table['Variance'].round(2).sum())*100}% of variance", fontsize=14)
plt.xlabel(f"PC 1: {(eigen_table.iloc[0]['Variance'].round(2))*100}% of variance explained", fontsize=14)
plt.ylabel(f"PC 2: {(eigen_table.iloc[1]['Variance'].round(2))*100}% of variance explained", fontsize=14)
plt.show()

In [None]:
# Plot of the accumulated variance of the principal components
plt.figure(figsize=(12,8))

plt.title(f"{eigen_table.shape[0]} main components that explain {(eigen_table['Variance'].round(2).sum())*100}% of variance", fontsize=14)
sns.barplot(x=eigen_table.index, y=eigen_table['Variance'], data=eigen_table, color='green')
plt.xlabel("Main components", fontsize=14)
plt.ylabel("Percentage of variance explained", fontsize=14)
plt.show()