In [0]:
#utilizado para a demonstração da preparação dos dados

In [0]:
from pyspark.sql import SparkSession #importa a biblioteca que cria a seção do spark

In [0]:
#inicia a seção para a utilização do spark
spark = SparkSession.builder.appName("PreparacaoDados").getOrCreate() #cria a seção caso não exista ou obtém a já criada

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/Arquivos_JSON_pratica3_5.zip,Arquivos_JSON_pratica3_5.zip,22623719,1680983125000
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286,1680615760000
dbfs:/FileStore/tables/designation.json,designation.json,400,1680983001000
dbfs:/FileStore/tables/iris_bezdekIris-1.csv,iris_bezdekIris-1.csv,4551,1680528662000
dbfs:/FileStore/tables/iris_bezdekIris-2.csv,iris_bezdekIris-2.csv,4551,1680573607000
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551,1680312578000
dbfs:/FileStore/tables/metadata_features.csv,metadata_features.csv,809199167,1680730335000
dbfs:/FileStore/tables/regressaoLinear.csv,regressaoLinear.csv,564,1680312578000
dbfs:/FileStore/tables/salary.json,salary.json,361,1680983126000
dbfs:/FileStore/tables/temperature.csv,temperature.csv,13971171,1680983199000


In [0]:
diretorioDatasetOcup="/FileStore/tables/designation.json"  #diretório que contém o arquivo a ser utilizado

In [0]:
diretorioDatasetSala="/FileStore/tables/salary.json"  #diretório que contém o arquivo a ser utilizado

In [0]:
#criando um RDD e convertendo em Dataframe
empregados=spark.sparkContext.parallelize([(1, "Joao", 25), (2, "Ricardo", 35), (3, "Marcio", 24), \
                           (4, "Janete", 28), (5, "Kely", 26), (6, "Vicente", 35), \
                           (7, "Jander", 38), (8, "Maria", 32), (9, "Gabriel", 29), \
                           (10, "Kimberly", 29), (11, "Alex", 28), (12, "Gustavo", 25), \
                           (13, "Rafael", 31)]).toDF(["emp_id","nome","idade"])

In [0]:
empregados.show(5)

+------+-------+-----+
|emp_id|   nome|idade|
+------+-------+-----+
|     1|   Joao|   25|
|     2|Ricardo|   35|
|     3| Marcio|   24|
|     4| Janete|   28|
|     5|   Kely|   26|
+------+-------+-----+
only showing top 5 rows



In [0]:
#lendo arquivos armazenados em um JSON
salario = spark.read.json(diretorioDatasetSala)  #lê os dados do diretório

In [0]:
salario.show(5)

+----+-------+
|e_id|salario|
+----+-------+
|   1|  10000|
|   2|  12000|
|   3|  12000|
|   4|   null|
|   5|    120|
+----+-------+
only showing top 5 rows



In [0]:
ocupacao = spark.read.json(diretorioDatasetOcup) #lê os dados do diretório

In [0]:
ocupacao.show(5)

+---------+---+
|    cargo| id|
+---------+---+
|Associado|  1|
|  Gerente|  2|
|  Gerente|  3|
|Associado|  4|
|  Gerente|  5|
+---------+---+
only showing top 5 rows



Preparação dos dados

In [0]:
#consolidação dos dados
df_final = empregados.join(salario, empregados.emp_id == salario.e_id).join(ocupacao, empregados.emp_id == ocupacao.id).select("e_id", "nome", "idade", "cargo", "salario")

In [0]:
df_final.show()

+----+--------+-----+--------------+-------+
|e_id|    nome|idade|         cargo|salario|
+----+--------+-----+--------------+-------+
|   1|    Joao|   25|     Associado|  10000|
|   2| Ricardo|   35|       Gerente|  12000|
|   3|  Marcio|   24|       Gerente|  12000|
|   4|  Janete|   28|     Associado|   null|
|   5|    Kely|   26|       Gerente|    120|
|   6| Vicente|   35|Gerente Senior|  22000|
|   7|  Jander|   38|Gerente Senior|  20000|
|   8|   Maria|   32|       Gerente|  12000|
|   9| Gabriel|   29|       Gerente|  10000|
|  10|Kimberly|   29|     Associado|   8000|
|  11|    Alex|   28|       Gerente|  12000|
|  12| Gustavo|   25|       Gerente|  12000|
|  13|  Rafael|   31|       Gerente| 120000|
+----+--------+-----+--------------+-------+



Limpando os dados

In [0]:
#retirando os valores NaN
clean_data = df_final.na.drop()

In [0]:
clean_data.show()

+----+--------+-----+--------------+-------+
|e_id|    nome|idade|         cargo|salario|
+----+--------+-----+--------------+-------+
|   1|    Joao|   25|     Associado|  10000|
|   2| Ricardo|   35|       Gerente|  12000|
|   3|  Marcio|   24|       Gerente|  12000|
|   5|    Kely|   26|       Gerente|    120|
|   6| Vicente|   35|Gerente Senior|  22000|
|   7|  Jander|   38|Gerente Senior|  20000|
|   8|   Maria|   32|       Gerente|  12000|
|   9| Gabriel|   29|       Gerente|  10000|
|  10|Kimberly|   29|     Associado|   8000|
|  11|    Alex|   28|       Gerente|  12000|
|  12| Gustavo|   25|       Gerente|  12000|
|  13|  Rafael|   31|       Gerente| 120000|
+----+--------+-----+--------------+-------+



Substituindo o Nan pelo valor médio da coluna

In [0]:
#substituindo os valores NaN pela média
import math  #utilizado para aplicar algumas funções matematicas
from pyspark.sql import functions as F  #contem as funções da linguagem SQL

In [0]:
#encontrando a média dos salários
salario_medio = math.floor(salario.select(F.mean('salario')).collect()[0][0])
print(salario_medio)

20843


In [0]:
clean_data = df_final.na.fill({'salario' : salario_medio})

In [0]:
clean_data.show()

+----+--------+-----+--------------+-------+
|e_id|    nome|idade|         cargo|salario|
+----+--------+-----+--------------+-------+
|   1|    Joao|   25|     Associado|  10000|
|   2| Ricardo|   35|       Gerente|  12000|
|   3|  Marcio|   24|       Gerente|  12000|
|   4|  Janete|   28|     Associado|  20843|
|   5|    Kely|   26|       Gerente|    120|
|   6| Vicente|   35|Gerente Senior|  22000|
|   7|  Jander|   38|Gerente Senior|  20000|
|   8|   Maria|   32|       Gerente|  12000|
|   9| Gabriel|   29|       Gerente|  10000|
|  10|Kimberly|   29|     Associado|   8000|
|  11|    Alex|   28|       Gerente|  12000|
|  12| Gustavo|   25|       Gerente|  12000|
|  13|  Rafael|   31|       Gerente| 120000|
+----+--------+-----+--------------+-------+



In [0]:
#outro exemplo de preparação
autores = [['Thomas','Hardy','June 2, 1840'],\
       ['Charles','Dickens','7 February 1812'],\
        ['Mark','Twain',None],\
        ['Jane','Austen','16 December 1775'],\
      ['Emily',None,None]]
df_autores = spark.sparkContext.parallelize(autores).toDF(
       ["PrimeiroNome","UltimoNome","Dob"])

In [0]:
df_autores.show()

+------------+----------+----------------+
|PrimeiroNome|UltimoNome|             Dob|
+------------+----------+----------------+
|      Thomas|     Hardy|    June 2, 1840|
|     Charles|   Dickens| 7 February 1812|
|        Mark|     Twain|            null|
|        Jane|    Austen|16 December 1775|
|       Emily|      null|            null|
+------------+----------+----------------+



Tratando valores duplicados

In [0]:
autores = [['Thomas','Hardy','June 2,1840'],\
    ['Thomas','Hardy','June 2,1840'],\
    ['Thomas','H',None],\
    ['Jane','Austen','16 December 1775'],\
    ['Emily',None,None]]

In [0]:
df_autores = spark.sparkContext.parallelize(autores).toDF(
      ["PrimeiroNome","UltimoNome","Dob"])

In [0]:
df_autores.show()

+------------+----------+----------------+
|PrimeiroNome|UltimoNome|             Dob|
+------------+----------+----------------+
|      Thomas|     Hardy|     June 2,1840|
|      Thomas|     Hardy|     June 2,1840|
|      Thomas|         H|            null|
|        Jane|    Austen|16 December 1775|
|       Emily|      null|            null|
+------------+----------+----------------+



In [0]:
#Retirando as linhas duplicadas
df_autores.dropDuplicates().show()

+------------+----------+----------------+
|PrimeiroNome|UltimoNome|             Dob|
+------------+----------+----------------+
|      Thomas|     Hardy|     June 2,1840|
|      Thomas|         H|            null|
|        Jane|    Austen|16 December 1775|
|       Emily|      null|            null|
+------------+----------+----------------+



Transformando os dados

In [0]:
#utiliza a diretriz UDF para criar a função a ser aplicada a cada uma das celulas selecionadas
concat_func = F.udf(lambda nome, idade: nome + "_" + str(idade))

In [0]:
#aplica a função UDF (concat_func) para criar o novo dataframe
concat_df = df_final.withColumn("nome_idade", concat_func(df_final.nome, df_final.idade))

In [0]:
concat_df.show()

+----+--------+-----+--------------+-------+-----------+
|e_id|    nome|idade|         cargo|salario| nome_idade|
+----+--------+-----+--------------+-------+-----------+
|   1|    Joao|   25|     Associado|  10000|    Joao_25|
|   2| Ricardo|   35|       Gerente|  12000| Ricardo_35|
|   3|  Marcio|   24|       Gerente|  12000|  Marcio_24|
|   4|  Janete|   28|     Associado|   null|  Janete_28|
|   5|    Kely|   26|       Gerente|    120|    Kely_26|
|   6| Vicente|   35|Gerente Senior|  22000| Vicente_35|
|   7|  Jander|   38|Gerente Senior|  20000|  Jander_38|
|   8|   Maria|   32|       Gerente|  12000|   Maria_32|
|   9| Gabriel|   29|       Gerente|  10000| Gabriel_29|
|  10|Kimberly|   29|     Associado|   8000|Kimberly_29|
|  11|    Alex|   28|       Gerente|  12000|    Alex_28|
|  12| Gustavo|   25|       Gerente|  12000| Gustavo_25|
|  13|  Rafael|   31|       Gerente| 120000|  Rafael_31|
+----+--------+-----+--------------+-------+-----------+



In [0]:
#cria a função que transforma o salario de reais para dólares
from pyspark.sql.types import LongType
def realDolar(salario):
  return salario*0.25
real_dolar = F.udf(lambda salario: realDolar(salario),LongType())

In [0]:
#aplica a função UDF (real_dolar) para criar o novo dataframe
df_real_dolar = df_final.withColumn("salario_dolar", real_dolar(df_final.salario))

Correlações

In [0]:
from pyspark.mllib.stat import Statistics

In [0]:
#cria duas series para encontrar a correlação
import random #utilizada pra gerar valores randomicos
serie_1 = spark.sparkContext.parallelize(random.sample(range(1,101),10)) #cria valores randomicos
serie_2 = spark.sparkContext.parallelize(random.sample(range(1,101),10)) #cria valores randômicos
serie_3=serie_1.map(realDolar)  #aplica a transformação (realDolar) sobre a serie 1

In [0]:
#aplicando a correlação
correlacao = Statistics.corr(serie_1, serie_2, method = "pearson")
print(correlacao)

0.3834154111678642


In [0]:
#aplicando a correlação
correlacao = Statistics.corr(serie_1, serie_3, method = "pearson")
print(correlacao)

1.0000000000000002


Redução da Dimensionalidade

In [0]:
from pyspark.ml.feature import PCA  #define a utilização do PCA
from pyspark.ml.linalg import Vectors #Utilizada para criar vetores com os dados

In [0]:
datasetDigitsDire="/FileStore/tables/digitsNew.csv"

In [0]:
data = spark.read.csv(datasetDigitsDire, header=True, inferSchema=True) #carrega o arquivo


In [0]:
data.show(5)

+-----+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----

In [0]:
from pyspark.ml.feature import VectorAssembler  #cria o vetor de características
assembler = VectorAssembler(inputCols=data.columns[1:], outputCol='features')  #define as colunas a serem utilizadas como características

In [0]:
data_2 = assembler.transform(data)  #aplica a transformação - Vetores-Características

In [0]:
data_2.show()  #label - dígitos /  features -> 28*28 pixels =  784

+-----+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-----

In [0]:
data_2.select("features").show()

+--------------------+
|            features|
+--------------------+
|(784,[132,133,134...|
|(784,[122,123,124...|
|(784,[124,125,126...|
|(784,[146,147,148...|
|(784,[121,122,123...|
|(784,[124,125,126...|
|(784,[202,203,204...|
|(784,[177,178,179...|
|(784,[153,154,155...|
|(784,[119,120,121...|
|(784,[180,181,182...|
|(784,[182,183,184...|
|(784,[154,155,156...|
|(784,[144,145,146...|
|(784,[122,123,124...|
|(784,[156,157,158...|
|(784,[148,149,150...|
|(784,[129,130,131...|
|(784,[206,207,208...|
|(784,[121,122,123...|
+--------------------+
only showing top 20 rows



In [0]:
from pyspark.ml.feature import PCA  #importa o PCA
pca = PCA(k=2, inputCol='features', outputCol='features_pca')  #define que queremos 2 dimensões 

In [0]:
pca_model = pca.fit(data_2)  #aplica o PCA

In [0]:
pca_data = pca_model.transform(data_2).select('features_pca')  #encontra os autovetores de duas dimensões 

In [0]:
pca_data.show(truncate=False)

+----------------------------------------+
|features_pca                            |
+----------------------------------------+
|[103.73881375798244,699.5124334036431]  |
|[2466.786278309411,360.7526613889307]   |
|[-121.55984060478046,293.96688737760746]|
|[599.5789910719535,-299.98165533942404] |
|[2689.044309475987,449.3541744175658]   |
|[1253.0865041336544,-192.5512969767518] |
|[93.01142906179628,-464.6028692847635]  |
|[650.9527788161633,20.92824777141599]   |
|[1115.5639590482808,-140.81186880240716]|
|[1062.726681921166,807.6133411755607]   |
|[1029.0169008155735,-392.58132059802983]|
|[458.80532138976895,-351.61844932900766]|
|[-200.3413397616221,359.0383591234478]  |
|[751.2639269571833,259.69710351087053]  |
|[1265.44211418056,644.6822601674015]    |
|[-199.11010313256023,282.59084272812413]|
|[762.7156949230418,668.0353590168523]   |
|[1744.7998651615999,249.56355444449798] |
|[128.31492885654367,-350.81012209179386]|
|[1731.441486490299,431.67265917387914]  |
+----------