# Projeto BIG DATA I

## Objetivo de projeto

Como queremos demostrar que de fato a solução proposta traz uma melhora, foi solicitado implementar uma análise comparativa de resultados usando a antiga abordagem (usando pandas) e usando a nova proposta de solução (pyspark). Para isso, tome em consideração o seguinte:

1. Escolha dois conjuntos de dados interessantes, sendo que um deles é pequeno (menos de 10.000 linhas) e o outro bem maior (acima de 1.000.000 linhas).

2. Aplique todas as etapas de ETL nos dois conjuntos de dados usando pandas y pyspark. As etapas incluem: (1) Extração dos dados, por exemplo de um csv, (2) Tratamento dos dados (limpeza, alteração de nomes de colunas, criação de mais tabelas, transformação nas colunas, etc.), e, (3) Carregamento dos dados (salvar a transformação feita sobre os dados).

3. Lembre que cada etapa tem que ser feita usando unicamente pandas ou pyspark.

4. Como o objetivo é fazer uma análise comparativa, tome em consideração o tempo que demora cada etapa, para depois facilitar as comparações.

Boa sorte e divirta-se!!

## Miscelâneas

### Instalando o PySpark no Google Colab
Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o **Java 8**, **Apache Spark 2.3.2** junto com o **Hadoop 2.7**.

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

# Unzip the file
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

# Install library for finding Spark
!pip install -q findspark

A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

### Set up the environment for Spark.

In [None]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

### Import the library for locating Spark.

In [None]:
import findspark  # Initiate findspark

findspark.init()  # Check the location for Spark
findspark.find()

'/content/spark-3.2.3-bin-hadoop3.2'

### Start a Spark session, and check the session information.

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()# Check Spark Session Information
spark

## Importar arquivos csv do GitHub

In [None]:
!wget https://github.com/GuhBrando/proj_Big_Data_I/raw/main/datasets/Fraudulent_Transactions_Data.csv
!wget https://github.com/GuhBrando/proj_Big_Data_I/raw/main/datasets/Netflix_movies.csv

--2023-02-13 22:48:12--  https://github.com/GuhBrando/proj_Big_Data_I/raw/main/datasets/Fraudulent_Transactions_Data.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://media.githubusercontent.com/media/GuhBrando/proj_Big_Data_I/main/datasets/Fraudulent_Transactions_Data.csv [following]
--2023-02-13 22:48:13--  https://media.githubusercontent.com/media/GuhBrando/proj_Big_Data_I/main/datasets/Fraudulent_Transactions_Data.csv
Resolving media.githubusercontent.com (media.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.111.133, ...
Connecting to media.githubusercontent.com (media.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 493534783 (471M) [text/plain]
Saving to: ‘Fraudulent_Transactions_Data.csv’


2023-02-13 22:48:32 (216 MB/s) - ‘Fraudulent_Transactions_Data.csv

## Pandas vs PySpark

### Importando bibliotecas

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pandas import DataFrame
from pyspark.sql import DataFrame
import time

# Gerenciamento de arquivos
import os
import shutil


### Comparação de tempos para leitura dos arquivos csv

In [None]:
#definições
num_casas = 4

# Iniciando a sessão do Spark
spark = SparkSession.builder.appName("Pandas_vs_PySpark").getOrCreate()

# Medindo o tempo de leitura dos arquivos csv usando pandas
start_time = time.time()
df_Netflix_pandas = pd.read_csv("Netflix_movies.csv")
df_Fraud_pandas = pd.read_csv("Fraudulent_Transactions_Data.csv")
pandas_time = time.time() - start_time

# Medindo o tempo de leitura dos arquivos csv usando PySpark
start_time = time.time()
df_Netflix_spark = spark.read.csv("Netflix_movies.csv", header=True, inferSchema=True)
df_Fraud_spark = spark.read.csv("Fraudulent_Transactions_Data.csv", header=True, inferSchema=True)
pyspark_time = time.time() - start_time

# Exibindo os tempos de execução
print("Tempo de leitura dos arquivos csv com Pandas: %s segundos" % round(pandas_time, num_casas))
print("Tempo de leitura dos arquivos csv com PySpark: %s segundos" % round(pyspark_time, num_casas))

# Verificando qual foi mais rápido
if pandas_time < pyspark_time:
    print("Pandas foi mais rápido na leitura dos arquivos csv")
else:
    print("PySpark foi mais rápido na leitura dos arquivos csv")

print("\n")

# Conversão para Parquet
# Verifica e exclui arquivos parquet anteriores

if os.path.exists('Netflix_movies.parquet'):
  shutil.rmtree('Netflix_movies.parquet')
if os.path.exists('Fraudulent_Transactions_Data.parquet'):
  shutil.rmtree('Fraudulent_Transactions_Data.parquet')

# Convertendo os arquivos csv para Parquet
start_time = time.time()
df_Netflix_spark.write.parquet('Netflix_movies.parquet')
df_Fraud_spark.write.parquet('Fraudulent_Transactions_Data.parquet')
convert_parquet_time = time.time() - start_time

# Exibindo o tempo de execução
print("Tempo de conversão dos arquivos para Parquet: %s segundos" % round(convert_parquet_time, num_casas))

# Iniciando a contagem de tempo para leitura do arquivo Parquet com Spark
start_time_parquet = time.time()
df_Netflix_parquet = spark.read.parquet('Netflix_movies.parquet')
df_Fraud_parquet = spark.read.parquet('Fraudulent_Transactions_Data.parquet')
parquet_time = time.time() - start_time_parquet

# Exibindo o tempo de execução
print("Tempo de leitura dos arquivos Parquet com Spark: %s segundos" % round(parquet_time, num_casas))

min_time = min(pandas_time, pyspark_time, parquet_time)

# Verificando qual foi mais rápido
if min_time == pandas_time:
  print("A biblioteca com menor tempo de processamento é o Pandas")
elif min_time == pyspark_time:
  print("A biblioteca com menor tempo de processamento é o PySpark")
else:
  print("A biblioteca com menor tempo de processamento é o Spark com Parquet")

Tempo de leitura dos arquivos csv com Pandas: 10.6843 segundos
Tempo de leitura dos arquivos csv com PySpark: 27.828 segundos
Pandas foi mais rápido na leitura dos arquivos csv


Tempo de conversão dos arquivos para Parquet: 37.6018 segundos
Tempo de leitura dos arquivos Parquet com Spark: 0.3337 segundos
A biblioteca com menor tempo de processamento é o Spark com Parquet


### Comparar o tempo de execução de algumas operações comuns usando pandas e PySpark

- Soma de coluna e output de valor unico
- Somas das linhas e inserir em uma nova coluna
- Schema estrela - em uma coluna
- Trocar nome das Colunas

###  Contando a quantidade de linhas do dataset

In [None]:

# Função para contar o número de linhas e tempo de processamento de três bibliotecas diferentes
def count_rows_and_time(dataset, df_pandas, df_spark, df_parquet):
    """
    Função que compara o tempo de processamento entre as bibliotecas Pandas, PySpark e Spark com Parquet ao contar as linhas de um determinado dataset.

    Parâmetros:
    dataset (str) : Nome do dataset a ser avaliado.
    df_pandas (pandas.DataFrame) : DataFrame do Pandas para o dataset informado.
    df_spark (pyspark.sql.DataFrame) : DataFrame do PySpark para o dataset informado.
    df_parquet (pyspark.sql.DataFrame) : DataFrame do Spark com Parquet para o dataset informado.

    Saídas:
    Imprime na tela o número de linhas de cada biblioteca para o dataset informado e o tempo de processamento de contar as linhas. Além disso, imprime a biblioteca com menor tempo de processamento para o dataset informado.
    """
    #numero de casas de arredondamento
    num_casas = 5
    
    start_time = time.time()
    pandas_rows = df_pandas[df_pandas.columns[1]].count()
    pandas_count_time = time.time() - start_time

    start_time = time.time()
    pyspark_rows = df_spark.count()
    pyspark_count_time = time.time() - start_time
    
    start_time = time.time()
    spark_rows = df_parquet.count()
    spark_count_time = time.time() - start_time

    # Exibindo as informações
    print("Quantidade de linhas do dataset %s usando pandas: %s" % (dataset, pandas_rows))
    print("Quantidade de linhas do dataset %s usando PySpark: %s" % (dataset, pyspark_rows))
    print("Quantidade de linhas do dataset %s usando Spark: %s" % (dataset, spark_rows))
    print("\n")
    print("Tempo de contar as linhas do dataset %s usando pandas: %s segundos" % (dataset, round(pandas_count_time, num_casas)))
    print("Tempo de contar as linhas do dataset %s usando PySpark: %s segundos" % (dataset, round(pyspark_count_time, num_casas)))
    print("Tempo de contar as linhas do dataset %s usando Spark: %s segundos" % (dataset, round(spark_count_time, num_casas)))
    print("\n")

    # Encontrando a biblioteca com menor tempo de processamento
    min_time = min(pandas_count_time, pyspark_count_time, spark_count_time)
    if min_time == pandas_count_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Pandas" % dataset)
    elif min_time == pyspark_count_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o PySpark" % dataset)
    else:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Spark com Parquet" % dataset)
    print("\n")

# aplicando a função para Netflix_Movie_Data
count_rows_and_time("Netflix_Movie_Data", df_Netflix_pandas, df_Netflix_spark, df_Netflix_parquet)

# aplicando a função para Fraudulent_Transactions_Data
count_rows_and_time("Fraudulent_Transactions_Data", df_Fraud_pandas, df_Fraud_spark, df_Fraud_parquet)



Quantidade de linhas do dataset Netflix_Movie_Data usando pandas: 3323
Quantidade de linhas do dataset Netflix_Movie_Data usando PySpark: 3323
Quantidade de linhas do dataset Netflix_Movie_Data usando Spark: 3323


Tempo de contar as linhas do dataset Netflix_Movie_Data usando pandas: 0.00475 segundos
Tempo de contar as linhas do dataset Netflix_Movie_Data usando PySpark: 1.19152 segundos
Tempo de contar as linhas do dataset Netflix_Movie_Data usando Spark: 1.08273 segundos


A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Pandas


Quantidade de linhas do dataset Fraudulent_Transactions_Data usando pandas: 6362620
Quantidade de linhas do dataset Fraudulent_Transactions_Data usando PySpark: 6362620
Quantidade de linhas do dataset Fraudulent_Transactions_Data usando Spark: 6362620


Tempo de contar as linhas do dataset Fraudulent_Transactions_Data usando pandas: 1.49131 segundos
Tempo de contar as linhas do dataset Fraudulent_Transactions_Data usando P

### Renomear colunas dos Datasets

#### Dicionário e Funções:

In [None]:
# Nome para as colunas dos datasets
TRANSACTION_COL_DICT = {
    "step": "time_step",
    "type": "transaction_type",
    "amount": "amount",
    "nameOrig": "originating_customer",
    "oldbalanceOrg": "originating_balance_before",
    "newbalanceOrig": "originating_balance_after",
    "nameDest": "recipient_customer",
    "oldbalanceDest": "recipient_balance_before",
    "newbalanceDest": "recipient_balance_after",
    "isFraud": "is_fraud",
    "isFlaggedFraud": "is_flagged_fraud"
}

NETFLIX_MOVIE_COL_DICT = {
"Unnamed: 0": "index",
"movie_name": "movie_name",
"Duration": "duration",
"year": "year_of_production",
"genre": "genre",
"director": "director",
"actors": "actors",
"country": "country_of_production",
"rating": "rating",
"enter_in_netflix": "date_of_entry"
}

In [None]:
# Função para renomear colunas com Pandas
def rename_cols_pandas(df: DataFrame, mapping_dict: dict) -> DataFrame:

    ''' 
    Função para renomear colunas de um dataframe usando a biblioteca Pandas.

    :param df: DataFrame de entrada
    :param mapping_dict: Dicionário com o mapeamento dos nomes das colunas
    :return: DataFrame de saída com as colunas renomeadas
    ''' 
    
    df.rename(columns=mapping_dict, inplace=True)
    return df
    
# Função para renomear colunas com PySpark
def rename_cols_pyspark(df: DataFrame, mapping_dict: dict) ->DataFrame:
    # Rename all the columns
    '''
    Função para renomear colunas de um dataframe usando a biblioteca PySpark.

    :param df: DataFrame de entrada
    :param mapping_dict: Dicionário com o mapeamento dos nomes das colunas
    :return: DataFrame de saída com as colunas renomeadas
    '''
    for key in mapping_dict.keys():
        df=df.withColumnRenamed(key,mapping_dict.get(key))
    return df

#### Comparações de tempo ao Renomear colunas

In [None]:
# 1. Rename Columns pandas

#definições
num_casas = 4

# Pandas
start_time = time.time()
df_Netflix_pandas = rename_cols_pandas(df_Netflix_pandas, NETFLIX_MOVIE_COL_DICT)
pandas_rename_time_Netflix = time.time() - start_time

start_time = time.time()
df_Fraud_pandas = rename_cols_pandas(df_Fraud_pandas, TRANSACTION_COL_DICT)
pandas_rename_time_Fraud = time.time() - start_time

# PySpark csv
start_time = time.time()
df_Netflix_spark = rename_cols_pyspark(df_Netflix_spark, NETFLIX_MOVIE_COL_DICT)
pyspark_rename_time_Netflix = time.time() - start_time

start_time = time.time()
df_Fraud_spark = rename_cols_pyspark(df_Fraud_spark, TRANSACTION_COL_DICT)
pyspark_time_Fraud = time.time() - start_time

# PySpark parquet
start_time = time.time()
df_Netflix_parquet = rename_cols_pyspark(df_Netflix_parquet, NETFLIX_MOVIE_COL_DICT)
spark_rename_time_Netflix = time.time() - start_time

start_time = time.time()
df_Fraud_parquet = rename_cols_pyspark(df_Fraud_parquet, TRANSACTION_COL_DICT)
spark_time_Fraud = time.time() - start_time

# Exibindo os tempos de renomeação de colunas
print("Tempo de renomear as colunas do dataset Netflix_Movie_Data usando Pandas: %s segundos" % round(pandas_rename_time_Netflix, num_casas))
print("Tempo de renomear as colunas do dataset Netflix_Movie_Data usando PySpark: %s segundos" % round(pyspark_rename_time_Netflix, num_casas))
print("Tempo de renomear as colunas do dataset Netflix_Movie_Data usando Spark com Parquet: %s segundos" % round(spark_rename_time_Netflix, num_casas))

# Verificando qual foi mais rápido para o dataset Netflix_Movie_Data
min_time = min(pandas_rename_time_Netflix, pyspark_rename_time_Netflix, spark_rename_time_Netflix)

if min_time == pandas_rename_time_Netflix:
  print("A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Pandas")
elif min_time == pyspark_rename_time_Netflix:
  print("A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o PySpark")
else:
  print("A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Spark com Parquet")

print("\n")

print("Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando Pandas: %s segundos" % round(pandas_rename_time_Fraud, num_casas))
print("Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando PySpark: %s segundos" % round(pyspark_time_Fraud, num_casas))
print("Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando Spark com Parquet: %s segundos" %round(spark_time_Fraud, num_casas))
# Verificando qual foi mais rápido para o dataset Fraudulent_Transactions_Data
min_time = min(pandas_rename_time_Fraud, pyspark_time_Fraud, spark_time_Fraud)

if min_time == pandas_rename_time_Fraud:
  print("A biblioteca com menor tempo de processamento para o dataset Fraudulent_Transactions_Data é o Pandas")
elif min_time == pyspark_time_Fraud:
  print("A biblioteca com menor tempo de processamento para o dataset Fraudulent_Transactions_Data é o PySpark")
else:
  print("A biblioteca com menor tempo de processamento para o dataset Fraudulent_Transactions_Data é o Spark com Parquet")

Tempo de renomear as colunas do dataset Netflix_Movie_Data usando Pandas: 0.0012 segundos
Tempo de renomear as colunas do dataset Netflix_Movie_Data usando PySpark: 0.1663 segundos
Tempo de renomear as colunas do dataset Netflix_Movie_Data usando Spark com Parquet: 0.1278 segundos
A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Pandas


Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando Pandas: 0.0006 segundos
Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando PySpark: 0.1661 segundos
Tempo de renomear as colunas do dataset Fraudulent_Transactions_Data usando Spark com Parquet: 0.0849 segundos
A biblioteca com menor tempo de processamento para o dataset Fraudulent_Transactions_Data é o Pandas


###Soma de coluna e output de valor unico

Nesta etapa são comparadas o tempo de cálculo de um valor de coluna. 

- Para os dados do Netflix_movies vamos calcular a média do rating de todos os filmes. 
- Para os dados do Fraudulent_Transaction_Data vamos calcular o valor médio das transações fraudulentas. 

#### Rating medio do Netflix_Movie_Data

In [None]:
from pyspark.sql.functions import mean

# Função para calcular o valor médio do ranting 
def mean_rating_time(dataset, df_pandas, df_spark, df_parquet):
    """
    Função que compara o tempo de processamento entre as bibliotecas Pandas, PySpark e Spark com Parquet ao contar as linhas de um determinado dataset.

    Parâmetros:
    dataset (str) : Nome do dataset a ser avaliado.
    df_pandas (pandas.DataFrame) : DataFrame do Pandas para o dataset informado.
    df_spark (pyspark.sql.DataFrame) : DataFrame do PySpark para o dataset informado.
    df_parquet (pyspark.sql.DataFrame) : DataFrame do Spark com Parquet para o dataset informado.

    Saídas:
    Imprime na tela o número médio de cada biblioteca para o dataset informado e o tempo de processamento de contar as linhas. Além disso, imprime a biblioteca com menor tempo de processamento para o dataset informado.
    """
    #numero de casas de arredondamento
    num_casas = 5
    
    start_time = time.time()
    pandas_mean = df_pandas[df_pandas.columns[8]].count()
    pandas_mean_time = time.time() - start_time
    
    start_time = time.time()
    pyspark_mean = df_spark.select(mean('rating')).collect()
    results={}
    for i in pyspark_mean:
      results.update(i.asDict())
    pyspark_mean = results['avg(rating)']
    pyspark_mean_time = time.time() - start_time

    start_time = time.time()
    parquet_mean = df_parquet.select(mean('rating')).collect()
    results={}
    for i in parquet_mean:
      results.update(i.asDict())
    parquet_mean = results['avg(rating)']
    spark_mean_time = time.time() - start_time

    # Exibindo as informações
    print("Rating medio no %s usando pandas: %s" % (dataset, round(pandas_mean, num_casas)))
    print("Rating medio no %s usando pandas: %s" % (dataset, round(pyspark_mean, num_casas)))
    print("Rating medio no %s usando pandas: %s" % (dataset, round(parquet_mean, num_casas)))
    print("\n")
    print("Tempo de calculo da médias do dataset %s usando pandas: %s segundos" % (dataset, round(pandas_mean_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando PySpark: %s segundos" % (dataset, round(pyspark_mean_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando Spark com Parquet: %s segundos" % (dataset, round(spark_mean_time, num_casas)))
    print("\n")

    # Encontrando a biblioteca com menor tempo de processamento
    min_time = min(pandas_mean_time, pyspark_mean_time, spark_mean_time)
    if min_time == pandas_mean_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Pandas" % dataset)
    elif min_time == pyspark_mean_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o PySpark" % dataset)
    else:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Spark com Parquet" % dataset)
    print("\n")

    

# aplicando a função para Netflix_Movie_Data
mean_rating_time("Netflix_Movie_Data", df_Netflix_pandas, df_Netflix_spark, df_Netflix_parquet)


Rating medio no Netflix_Movie_Data usando pandas: 3323
Rating medio no Netflix_Movie_Data usando pandas: 6.20093
Rating medio no Netflix_Movie_Data usando pandas: 6.20093


Tempo de calculo da médias do dataset Netflix_Movie_Data usando pandas: 0.00034 segundos
Tempo de calculo da médias do dataset Netflix_Movie_Data usando PySpark: 0.42063 segundos
Tempo de calculo da médias do dataset Netflix_Movie_Data usando Spark com Parquet: 0.29307 segundos


A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Pandas




#### Mean amount on Fraudulent_Transactions_Data

In [None]:
from pyspark.sql.functions import mean

# Função para calcular o valor médio do ranting 
def mean_amount_time(dataset, df_pandas, df_spark, df_parquet):
    """
    Função que compara o tempo de processamento entre as bibliotecas Pandas, PySpark e Spark com Parquet ao contar as linhas de um determinado dataset.

    Parâmetros:
    dataset (str) : Nome do dataset a ser avaliado.
    df_pandas (pandas.DataFrame) : DataFrame do Pandas para o dataset informado.
    df_spark (pyspark.sql.DataFrame) : DataFrame do PySpark para o dataset informado.
    df_parquet (pyspark.sql.DataFrame) : DataFrame do Spark com Parquet para o dataset informado.

    Saídas:
    Imprime na tela o número médio de cada biblioteca para o dataset informado e o tempo de processamento de contar as linhas. Além disso, imprime a biblioteca com menor tempo de processamento para o dataset informado.
    """
    #numero de casas de arredondamento
    num_casas = 5
    
    start_time = time.time()
    pandas_mean = df_pandas[df_pandas.columns[2]].count()
    pandas_mean_time = time.time() - start_time
    
    start_time = time.time()
    pyspark_mean = df_spark.select(mean('amount')).collect()
    results={}
    for i in pyspark_mean:
      results.update(i.asDict())
    pyspark_mean = results['avg(amount)']
    pyspark_mean_time = time.time() - start_time

    start_time = time.time()
    parquet_mean = df_parquet.select(mean('amount')).collect()
    results={}
    for i in parquet_mean:
      results.update(i.asDict())
    parquet_mean = results['avg(amount)']
    spark_mean_time = time.time() - start_time

    # Exibindo as informações
    print("Amount medio no %s usando pandas: %s" % (dataset, round(pandas_mean, num_casas)))
    print("Amount medio no %s usando pandas: %s" % (dataset, round(pyspark_mean, num_casas)))
    print("Amount medio no %s usando pandas: %s" % (dataset, round(parquet_mean, num_casas)))
    print("\n")
    print("Tempo de calculo da médias do dataset %s usando pandas: %s segundos" % (dataset, round(pandas_mean_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando PySpark: %s segundos" % (dataset, round(pyspark_mean_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando Spark com Parquet: %s segundos" % (dataset, round(spark_mean_time, num_casas)))
    print("\n")

    # Encontrando a biblioteca com menor tempo de processamento
    min_time = min(pandas_mean_time, pyspark_mean_time, spark_mean_time)
    if min_time == pandas_mean_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Pandas" % dataset)
    elif min_time == pyspark_mean_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o PySpark" % dataset)
    else:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Spark com Parquet" % dataset)
    print("\n")

    
# aplicando a função para Fraudulent_Transactions_Data
mean_amount_time("Fraudulent_Transactions_Data", df_Fraud_pandas, df_Fraud_spark, df_Fraud_parquet)



Amount medio no Fraudulent_Transactions_Data usando pandas: 6362620
Amount medio no Fraudulent_Transactions_Data usando pandas: 179861.90355
Amount medio no Fraudulent_Transactions_Data usando pandas: 179861.90355


Tempo de calculo da médias do dataset Fraudulent_Transactions_Data usando pandas: 0.03034 segundos
Tempo de calculo da médias do dataset Fraudulent_Transactions_Data usando PySpark: 11.61385 segundos
Tempo de calculo da médias do dataset Fraudulent_Transactions_Data usando Spark com Parquet: 0.68237 segundos


A biblioteca com menor tempo de processamento para o dataset Fraudulent_Transactions_Data é o Pandas




### Contar distintos

#### Contar distintos netflix - Netflix_Movie_Data

In [None]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import countDistinct

# Função para calcular o valor médio do ranting 
def diff_actor_time(dataset, df_pandas, df_spark, df_parquet):
    """
    Função que compara o tempo de processamento entre as bibliotecas Pandas, PySpark e Spark com Parquet ao contar as linhas diferentes
    Parâmetros:
    dataset (str) : Nome do dataset a ser avaliado.
    df_pandas (pandas.DataFrame) : DataFrame do Pandas para o dataset informado.
    df_spark (pyspark.sql.DataFrame) : DataFrame do PySpark para o dataset informado.
    df_parquet (pyspark.sql.DataFrame) : DataFrame do Spark com Parquet para o dataset informado.

    Saídas:
    Imprime na tela o número de linhas diferentes de cada biblioteca para o dataset informado e o tempo de processamento de contar as linhas. Além disso, imprime a biblioteca com menor tempo de processamento para o dataset informado.
    """
    #numero de casas de arredondamento
    num_casas = 5
    
    start_time = time.time()
    pandas_diff = len(df_pandas["actors"].unique())
    pandas_diff_time = time.time() - start_time
    
    start_time = time.time()
    pyspark_diff = df_spark.select(countDistinct('actors')).collect()
    results={}
    for i in pyspark_diff:
      results.update(i.asDict())
    pyspark_diff = results['count(DISTINCT actors)']
    pyspark_diff_time = time.time() - start_time

    start_time = time.time()
    spark_diff = df_parquet.select(countDistinct('actors')).collect()
    results={}
    for i in spark_diff:
      results.update(i.asDict())
    spark_diff = results['count(DISTINCT actors)']
    spark_diff_time = time.time() - start_time

    # Exibindo as informações
    print("Contagem diferentes actors no %s usando pandas: %s" % (dataset, round(pandas_diff, num_casas)))
    print("Contagem diferentes actors no %s usando pandas: %s" % (dataset, round(pyspark_diff, num_casas)))
    print("Contagem diferentes actors no %s usando pandas: %s" % (dataset, round(spark_diff, num_casas)))
    print("\n")
    print("Tempo de calculo da médias do dataset %s usando pandas: %s segundos" % (dataset, round(pandas_diff_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando PySpark: %s segundos" % (dataset, round(pyspark_diff_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando Spark com Parquet: %s segundos" % (dataset, round(spark_diff_time, num_casas)))
    print("\n")

    # Encontrando a biblioteca com menor tempo de processamento
    min_time = min(pandas_diff_time, pyspark_diff_time, spark_diff_time)
    if min_time == pandas_diff_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Pandas" % dataset)
    elif min_time == pyspark_diff_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o PySpark" % dataset)
    else:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Spark com Parquet" % dataset)
    print("\n")

    

# aplicando a função para Netflix_Movie_Data
diff_actor_time("Netflix_Movie_Data", df_Netflix_pandas, df_Netflix_spark, df_Netflix_parquet)


Contagem diferentes actors no Netflix_Movie_Data usando pandas: 3237
Contagem diferentes actors no Netflix_Movie_Data usando pandas: 3237
Contagem diferentes actors no Netflix_Movie_Data usando pandas: 3237


Tempo de calculo da médias do dataset Netflix_Movie_Data usando pandas: 0.01248 segundos
Tempo de calculo da médias do dataset Netflix_Movie_Data usando PySpark: 2.28388 segundos
Tempo de calculo da médias do dataset Netflix_Movie_Data usando Spark com Parquet: 0.52489 segundos


A biblioteca com menor tempo de processamento para o dataset Netflix_Movie_Data é o Pandas




#### Contar distintos - Fraudulent_Transactions_Data

In [None]:
from pyspark.sql.functions import mean

# Função para calcular o valor médio do ranting 
def diff_nameOrig_time(dataset, df_pandas, df_spark, df_parquet):
    """
    Função que compara o tempo de processamento entre as bibliotecas Pandas, PySpark e Spark com Parquet ao contar as linhas diferentes
    Parâmetros:
    dataset (str) : Nome do dataset a ser avaliado.
    df_pandas (pandas.DataFrame) : DataFrame do Pandas para o dataset informado.
    df_spark (pyspark.sql.DataFrame) : DataFrame do PySpark para o dataset informado.
    df_parquet (pyspark.sql.DataFrame) : DataFrame do Spark com Parquet para o dataset informado.

    Saídas:
    Imprime na tela o número de linhas diferentes de cada biblioteca para o dataset informado e o tempo de processamento de contar as linhas. Além disso, imprime a biblioteca com menor tempo de processamento para o dataset informado.
    """
    #numero de casas de arredondamento
    num_casas = 5
    
    start_time = time.time()
    pandas_diff = len(df_pandas['originating_customer'].unique())
    pandas_diff_time = time.time() - start_time
    
    start_time = time.time()
    pyspark_diff = df_spark.select(countDistinct('originating_customer'))
    results={}
    for i in pyspark_diff:
      results.update(i.asDict())
    pyspark_diff = results['count(DISTINCT originating_customer)']
    pyspark_diff_time = time.time() - start_time

    start_time = time.time()
    spark_diff = df_parquet.select(countDistinct('originating_customer'))
    results={}
    for i in spark_diff:
      results.update(i.asDict())
    spark_diff = results['count(DISTINCT originating_customer)']
    spark_diff_time = time.time() - start_time

    # Exibindo as informações
    print("Contagem diferentes customer_origim no %s usando pandas: %s" % (dataset, round(pandas_diff, num_casas)))
    print("Contagem diferentes customer_origim no %s usando pandas: %s" % (dataset, round(pyspark_diff, num_casas)))
    print("Contagem diferentes customer_origim no %s usando pandas: %s" % (dataset, round(spark_diff, num_casas)))
    print("\n")
    print("Tempo de calculo da médias do dataset %s usando pandas: %s segundos" % (dataset, round(pandas_diff_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando PySpark: %s segundos" % (dataset, round(pyspark_diff_time, num_casas)))
    print("Tempo de calculo da médias do dataset %s usando Spark com Parquet: %s segundos" % (dataset, round(spark_diff_time, num_casas)))
    print("\n")

    # Encontrando a biblioteca com menor tempo de processamento
    min_time = min(pandas_diff_time, pyspark_diff_time, spark_diff_time)
    if min_time == pandas_diff_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Pandas" % dataset)
    elif min_time == pyspark_diff_time:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o PySpark" % dataset)
    else:
        print("A biblioteca com menor tempo de processamento para o dataset %s é o Spark com Parquet" % dataset)
    print("\n")

    

# aplicando a função para Netflix_Movie_Data
diff_nameOrig_time("Fraudulent_Transactions_Data", df_Fraud_pandas, df_Fraud_spark, df_Fraud_parquet)


TypeError: ignored