<a href="https://colab.research.google.com/github/GaboRamalho/Big-Data/blob/main/Processamento_e_An%C3%A1lise_Descritiva_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
# Iniciar uma sessão local e importar dados
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

# **BASE DE DADOS 1**

## Explorando a base de dados

In [None]:
df = sc.read.csv('terremotos.csv', header=True, sep=";",  inferSchema=True)
df.show(5)

In [None]:
df.show(5, truncate=False)

### Visualizando as colunas

In [None]:
df.columns

<a id='dataframe-schema'></a>
### Dataframe Schema

In [None]:
df.dtypes

NameError: ignored

In [None]:
df.printSchema()

## Colunas



1.   Selecionar coluna
2.   Selecionar várias colunas
3.   Adicionar coluna
4.   Renomear
5.   Agrupar
6.   Remover




### Selecionando 1 coluna

In [None]:
df.select(df.Magnitude).show(10)

In [None]:
from pyspark.sql.functions import col
df.select(col('Magnitude')).show(10)


### Selecionando múltiplas colunas

In [None]:
df.select(col('Magnitude'),col('Profundidade(km)')).show(10)


### Adicionar nova coluna

#### Datas

In [None]:
from pyspark.sql.functions import *
df = df.withColumn('Hora_', hour(df.Hora))
df = df.withColumn('Minuto', minute(df.Hora))
df.show(3, truncate=False)

In [None]:
df = df.withColumn('Ano', year(to_date(df.Data, 'yyyy.MM.dd')))
df = df.withColumn('Mês', month(to_date(df.Data, 'yyyy.MM.dd')))
df.show(3, truncate=False)

In [None]:
df = df.withColumn('Mult', (col('Profundidade(km)')*col('Magnitude')))
df.show(3)


### Renomear Coluna

In [None]:
df = df.withColumnRenamed('Tipo', 'TipoTerremoto')
df.show(3, truncate=False)


### Agrupando por colunas

In [None]:
df.groupBy('Ano').count().show(10)

In [None]:
df.groupBy('TipoTerremoto').count().show()

In [None]:
df.groupBy('TipoTerremoto', 'Ano').count().show(20)


### Remover Colunas

In [None]:
df = df.drop('Data','Hora')
df.show(5,truncate=False)


## Linhas


1. Filtrando Linhas
2. Obtedo linhas distintas
3. Classificando Linhas





### Filtrando/selecionando por linhas

In [None]:
total_contagem = df.count()
print("TOTAL Contagem: " + str(total_contagem)+'\n')

ano_2022_contagem = df.filter(col('Ano')==2022).count()
print("2022 Contagem: " + str(ano_2022_contagem)+'\n')

base_2022 = df.filter(col('Ano')==2022).show(truncate=False)

In [None]:
ano_2022_magnitude = df.filter((col('Ano')==2022) &
                                  (col('Magnitude')>4)).count()

print("2022 Contagem:" + str(ano_2022_magnitude)+'\n')

df.filter((col('Ano')==2022) &
                              (col('Magnitude')>4)).show(truncate=False)


### Distinct

In [None]:
df.select('Ano').distinct().show()

In [None]:
df.select('Ano','TipoTerremoto').distinct().show()


### Classificando linhas

In [None]:
df.orderBy('Ano').show(truncate=False)

In [None]:

df.groupBy("Ano").count().orderBy('count', ascending=False).show(10)

# **Dados para pandas**

In [None]:
df_pandas = df.toPandas()

In [None]:
df_pandas.head()

In [None]:
df_pandas.dtypes

In [None]:
# Importar matplotlib.
import matplotlib.pyplot as plt
# Importar seaborn.
import seaborn as sns

In [None]:
# Histograma para aas variaveis numéricas.
plt.hist(df_pandas.Magnitude)
plt.xlabel('Magnitude')
plt.ylabel('Frequência')
plt.show()

In [None]:
plt.boxplot(df_pandas.Magnitude)
plt.show()

In [None]:
# Plotando dispersão de dados = relacionando duas variáveis numéricas.
plt.figure(figsize = (7,4))
plt.scatter(
    df_pandas['Profundidade(km)'],
    df_pandas['Magnitude'],
    c='blue')

plt.xlabel("Profundidade")
plt.ylabel("Magnitude")
plt.show()

In [None]:
# Verifcando categorias ou classes
classes_tipos_terremotos = df_pandas['TipoTerremoto'].value_counts()
nome_classes = ['Ke','Sm']
quantidade_classes = [49886,114]
fig = plt.figure(figsize =(7, 7))
plt.pie(quantidade_classes, labels = nome_classes, autopct='%1.2f%%')
plt.show()

In [None]:
# Gráfico de barras com seaborn
sns.countplot(x='TipoTerremoto', data=df_pandas)
plt.show()

In [None]:
# Gráfico de barras com seaborn
sns.countplot(x='Mês', data=df_pandas)
plt.show()

In [None]:
# Gráfico de barras com seaborn
fig = plt.figure(figsize=(15,6))
sns.countplot(x='Ano', data=df_pandas)
plt.show()

In [None]:
# Gráfico de barras com seaborn
fig = plt.figure(figsize=(15,6))
sns.countplot(x='Ano', data=df_pandas, order = df_pandas["Ano"].value_counts().index)
plt.show()

In [None]:
# Gráfico de barras com seaborn
sns.countplot(x='Hora_', data=df_pandas)
plt.show()

In [None]:
# Grafico gerado entre magnitide, profundidade e tipo do terremoto.
sns.relplot(
    data=df_pandas,
    x="Profundidade(km)", y="Magnitude", hue="Ano", palette="rocket_r")
plt.show()

In [None]:
# Plotar o boxplot de uma variável em relação as classes com seaborn.

plt.subplots( figsize=(8, 6))
sns.boxplot(x='TipoTerremoto', y='Magnitude', data=df_pandas)
plt.show()

In [None]:
# Plotar o boxplot de uma variável em relação as classes com seaborn.

plt.subplots( figsize=(15, 6))
sns.boxplot(x='Ano', y='Magnitude', data=df_pandas)
plt.show()

# **BASE DE DADOS 2**

In [None]:
df2 = sc.read.csv('Carros.csv', header=True, sep=";",  inferSchema=True)
df2.show(5)


### String

In [None]:
from pyspark.sql import functions
print(dir(functions))

In [None]:
from pyspark.sql.functions import col, lower, upper, substring

In [None]:
df2.select(col('Carro'),lower(col('Carro')),upper(col('Carro')),substring(col('Carro'),1,4).alias("concatenated value")).show(3, False)

In [None]:
df2 = df2.withColumn('Carro', upper(df2.Carro)).show(5, False)


### Numérico

In [None]:
from pyspark.sql.functions import min, max
df2.select(min(df2.Peso), max(df2.Peso)).show()


## SQL

In [None]:

df = sc.read.csv('Carros.csv', header=True, sep=";")
# Tabela temporária
df.createOrReplaceTempView("temp")

sc.sql("select * from temp limit 15").show()

sc.sql("select count(*) as total_count from temp").show()