# Preparando ambiente para PySpark

#### Dependências para Python 3.10

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark

#### Configuração das Variáveis de Ambiente

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

#### Tornar PySpark Importável

In [None]:
# tornar o pyspark "importável"
import findspark
findspark.init('spark-3.4.0-bin-hadoop3')

# Iniciando Ambiente

In [None]:
# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

#### Carregando dados

In [None]:
# Faça Upload do seu Arquivo
from google.colab import files
f = files.upload()

In [None]:
import pandas as pd

In [None]:
df_pd = pd.read_excel('Data.xlsx')
#Carregando dados
#df_spark = sc.read.csv("train.csv", inferSchema=True, header=True)
df_sp = sc.createDataFrame(df_pd)

# Funções Basicas

## Schema

In [None]:
# ver algumas informações sobre os tipos de dados de cada coluna
df_sp.printSchema()

root
 |-- A: long (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)



## Mostrar primeiras linhas

In [None]:
df_sp.show(5)

+---+------+--------------------+
|  A|     B|                   C|
+---+------+--------------------+
|  1| Email|{"meta" : 12345,"...|
|  2|Mobile|{"meta" : 12345,"...|
|  3|   PCM|{"meta" : 12345,"...|
+---+------+--------------------+



## Contar Total de Linhas e Valores Distintos de uma Coluna

In [None]:
df_sp.count()

3

In [None]:
df_sp.select('C').distinct().count()

3

## Usar alias para uma coluna

In [None]:
from pyspark.sql.functions import col

In [None]:
df = df_sp.select(col('A').alias('id'), col('B').alias('channel'), col('C').alias('data'))
df.show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
|  2| Mobile|{"meta" : 12345,"...|
|  3|    PCM|{"meta" : 12345,"...|
+---+-------+--------------------+



## Filtros

In [None]:
df.where("id = '1'").show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



In [None]:
df.where( df.id == 1).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



In [None]:
df.where( df['id'] == 1).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



### AND

In [None]:
df.where(
    (col('id') == "1") & (col('channel') == 'Email')
).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



### OR

In [None]:
df.where(
    (col('id') == "1") | (col('channel') == 'Email')
).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



## Like

In [None]:
df.where(
    col('channel').like('%mail%')
).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|  Email|{"meta" : 12345,"...|
+---+-------+--------------------+



## Between

In [None]:
df.where(
    col('id').between(2, 3)
).show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  2| Mobile|{"meta" : 12345,"...|
|  3|    PCM|{"meta" : 12345,"...|
+---+-------+--------------------+



# Replace

In [None]:
from pyspark.sql.functions import regexp_replace

Usar withColumn realiza a operação Implace se você não atribuir a outro dataFrame. Ele também pode adicionar uma coluna nova, se o primeiro argumento for o nome de uma coluna que não existe no dataFrame

Se tivermos um caracter especial, como o "$" numa coluna de preço, e quisermos remove-lo a expressão ficaria:

df_rep = df.withColumn('price', regexp_replace('price', '\$', ''))

In [None]:
df_rep = df.withColumn('channel', regexp_replace('channel', 'Email', 'Mail'))
df_rep.show()

+---+-------+--------------------+
| id|channel|                data|
+---+-------+--------------------+
|  1|   Mail|{"meta" : 12345,"...|
|  2| Mobile|{"meta" : 12345,"...|
|  3|    PCM|{"meta" : 12345,"...|
+---+-------+--------------------+



# Tipagem

In [None]:
from pyspark.sql.functions import cast

In [None]:
df_wc = df.withColumn('id', col('id').cast('int'))
df_wc.printSchema()

root
 |-- id: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- data: string (nullable = true)



In [None]:
df_slct = df.select(
    col('id').cast('int'),
    col('channel'),
    col('data')
)
df_slct.printSchema()

root
 |-- id: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- data: string (nullable = true)



In [None]:
from pyspark.sql.types import IntegerType

In [None]:
df_types = df.select(
    col('id').cast(IntegerType()),
    col('channel'),
    col('data')
)
df_types.printSchema()

root
 |-- id: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- data: string (nullable = true)

