# Trabalho prático de Análise Inteligente em Sistemas de "Big Data"

Importar bibliotecas

In [2]:
import pandas as pd
import time
from pymongo import MongoClient
from pyspark.sql import SparkSession
from unidecode import unidecode

# Função para remover acentos de uma string
def remover_acentos(texto):
    return unidecode(texto)

Juntar os datasets e medir o tempo de execução com Python

In [17]:
# Measure exec time: Start point
start_time = time.time()

# Read the data from the CSV files
ds1 = pd.read_excel('dataset_input/efeito_estufa.xlsx')
ds2 = pd.read_excel('dataset_input/emissoes_gases.xlsx')
ds3 = pd.read_excel('dataset_input/intensidade_carbonica.xlsx')
ds4 = pd.read_excel('dataset_input/PIBcrescimento.xlsx')
ds5 = pd.read_excel('dataset_input/PIBdespesas.xlsx')
ds6 = pd.read_excel('dataset_input/PIBproducao.xlsx')
ds7 = pd.read_excel('dataset_input/PIBrendimento.xlsx')

ds1 = ds1.add_suffix('_efeito_estufa')
ds1.rename(columns={'Anos_efeito_estufa': 'anos'}, inplace=True)

ds2.rename(columns={'Anos': 'anos'}, inplace=True)

ds3 = ds3.add_suffix('_intensidade_carbonica')
ds3.rename(columns={'Anos_intensidade_carbonica': 'anos'}, inplace=True)

ds4 = ds4.add_suffix('_PIBcrescimento')
ds4.rename(columns={'Anos_PIBcrescimento': 'anos'}, inplace=True)

ds5 = ds5.add_suffix('_PIBdespesas')
ds5.rename(columns={'Anos_PIBdespesas': 'anos'}, inplace=True)

ds6 = ds6.add_suffix('_PIBproducao')
ds6.rename(columns={'Anos_PIBproducao': 'anos'}, inplace=True)

ds7 = ds7.add_suffix('_PIBrendimento')
ds7.rename(columns={'Anos_PIBrendimento': 'anos'}, inplace=True)

#Merge the data
dsAmbiente = pd.merge(pd.merge(ds1, ds2, on='anos'), ds3, on='anos')
dsPIB = pd.merge(pd.merge(ds4, ds5, on='anos'), pd.merge(ds6, ds7, on='anos'), on='anos')

# verify joined datasets
dsPIB.dropna(inplace=True)

dsAmbiente.dropna(inplace=True)

ds = pd.merge(dsAmbiente, dsPIB, on='anos')

ds.columns = map(remover_acentos, ds.columns.str.replace('das ', '').str.replace('de ', '').str.replace('e ', '').str.replace(',', '').str.replace(' ', '_').str.lower())

# Measure exec time: End point
end_time = time.time()

# Calculate execution time and show in stdout
exec_time = end_time - start_time
print(f"Execution time: {round(exec_time, 3)} seconds")

Execution time: 0.288 seconds


Data Visualization


In [18]:
ds.isnull().sum()


anos                                                                                                0
total_efeito_estufa                                                                                 0
total_atividades_economicas_efeito_estufa                                                           0
total_familias_efeito_estufa                                                                        0
dioxido_carbono_origem_fossil                                                                       0
dioxido_carbono_com_origem_na_biomassa                                                              0
oxido_nitroso                                                                                       0
metano                                                                                              0
amoniaco                                                                                            0
compostos_organicos_volateis_nao_metanosos                                        

In [11]:
ds.filter(like='total' , axis=1)

Unnamed: 0,total_efeito_estufa,total_atividades_economicas_efeito_estufa,total_familias_efeito_estufa,total_pibdespesas,total_valor_acrescentado_bruto_pibdespesas,total_pibproducao,total_pibrendimento
0,70470.1,61032.0,9438.1,89028.6,78452.6,89028.6,89028.6
1,68293.3,58328.4,9964.9,94351.6,82876.8,94351.6,94351.6
2,71402.6,61332.0,10070.6,102331.0,90046.3,102331.0,102331.0
3,76265.9,65634.9,10631.0,111353.4,97354.5,111353.4,111353.4
4,84465.0,73346.0,11119.0,119603.3,104225.1,119603.3,119603.3
5,83691.1,72187.7,11503.4,128414.4,112521.7,128414.4,128414.4
6,83571.0,71847.7,11723.3,135775.0,119098.2,135775.0,135775.0
7,87895.3,75788.6,12106.7,142554.3,124721.5,142554.3,142554.3
8,83113.3,71011.2,12102.1,146067.9,127734.3,146067.9,146067.9
9,86678.1,74536.8,12141.3,152248.4,133144.8,152248.4,152248.4


In [12]:
ds['total_pibproducao'].equals(ds['total_pibrendimento'])

True

In [13]:
ds['total_pibproducao'].equals(ds['total_pibdespesas'])

True

Pre-processing


In [19]:
ds.drop(columns=['total_pibproducao', 'total_pibrendimento'], inplace=True)

ds.rename(columns={'total_pibdespesas': 'total_pib'}, inplace=True)\

ds.to_csv('dataset_output/output.csv', index=False)

Utilizar o PySpark para medição do tempo de execução, com conversão do dataset para formato Parquet (*Nota: Aparente erro na função write*)

In [54]:
session = SparkSession.builder.appName("CSV to Parquet").getOrCreate()

df_spark = session.read.csv("dataset_output/output.csv", header=True, inferSchema=True)

df_spark.write.parquet("dataset_output/output.parquet") # Write pode não estar funcional

session.stop()

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/c:/Users/Work/Documents/Aulas2023_24/sem2/BD/BigData/dataset_output/output.parquet already exists. Set mode as "overwrite" to overwrite the existing path.

Introdução do ficheiro ~~Parquet~~ (*not working*) CSV para uma base de dados (MongoDB)

In [6]:
# Connect to MongoDB

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client["BigData"]
collection = db["data"]

df = pd.read_csv("dataset_output/output.csv")
dictionary = df.to_dict(orient="records")

collection.insert_many(dictionary)

InsertManyResult([ObjectId('6617bc99b8a60fe8dac962c6'), ObjectId('6617bc99b8a60fe8dac962c7'), ObjectId('6617bc99b8a60fe8dac962c8'), ObjectId('6617bc99b8a60fe8dac962c9'), ObjectId('6617bc99b8a60fe8dac962ca'), ObjectId('6617bc99b8a60fe8dac962cb'), ObjectId('6617bc99b8a60fe8dac962cc'), ObjectId('6617bc99b8a60fe8dac962cd'), ObjectId('6617bc99b8a60fe8dac962ce'), ObjectId('6617bc99b8a60fe8dac962cf'), ObjectId('6617bc99b8a60fe8dac962d0'), ObjectId('6617bc99b8a60fe8dac962d1'), ObjectId('6617bc99b8a60fe8dac962d2'), ObjectId('6617bc99b8a60fe8dac962d3'), ObjectId('6617bc99b8a60fe8dac962d4'), ObjectId('6617bc99b8a60fe8dac962d5'), ObjectId('6617bc99b8a60fe8dac962d6'), ObjectId('6617bc99b8a60fe8dac962d7'), ObjectId('6617bc99b8a60fe8dac962d8'), ObjectId('6617bc99b8a60fe8dac962d9'), ObjectId('6617bc99b8a60fe8dac962da'), ObjectId('6617bc99b8a60fe8dac962db'), ObjectId('6617bc99b8a60fe8dac962dc'), ObjectId('6617bc99b8a60fe8dac962dd'), ObjectId('6617bc99b8a60fe8dac962de'), ObjectId('6617bc99b8a60fe8dac962