### Configurações iniciais

In [14]:
import os
import sys
import pyspark
from pyspark.sql import SparkSession

os.environ["PYSPARK_PYTHON"] = sys.executable

spark = SparkSession.builder.master("local[*]") \
                    .appName('SparkHelloWorld') \
                    .getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

### Dados

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = [
    ("James", "", "Smith", "36636", "M", 3000),
    ("Michael", "Rose", "", "40288", "M", -1),
    ("Robert", "", "Williams", "42114", "M", 4000),
    ("Maria", "Anne", "Jones", "39192", "F", 4000),
    ("Jen", "Mary", "Brown", None, "F", 3000)
]

schema = StructType([
    StructField("firstname", StringType(), True), \
    StructField("middlename", StringType(), True), \
    StructField("lastname", StringType(), True), \
    StructField("id", StringType(), True), \
    StructField("sex", StringType(), True), \
    StructField("salary", IntegerType(), True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show()

NameError: name 'spark' is not defined

### Filtrar (similar a SQL)

In [16]:
from pyspark.sql.functions import col

df_filtered = df.where(
    (col("id").isNotNull()) & (col("salary") > 0)
    
)

df_filtered.show()

NameError: name 'df' is not defined

### Concatenar os nomes

In [4]:
from pyspark.sql.functions import concat

df_full_name = df_filtered.withColumn(
    "fullname",
    concat(col("firstname"), col("middlename"), col("lastname"))
)

df_full_name.show()

+---------+----------+--------+-----+---+------+--------------+
|firstname|middlename|lastname|   id|sex|salary|      fullname|
+---------+----------+--------+-----+---+------+--------------+
|    James|          |   Smith|36636|  M|  3000|    JamesSmith|
|   Robert|          |Williams|42114|  M|  4000|RobertWilliams|
|    Maria|      Anne|   Jones|39192|  F|  4000|MariaAnneJones|
+---------+----------+--------+-----+---+------+--------------+



### Como separar os nomes com espaços?
É necessário utilizar a função lit(), a qual faz a conversão de um tipo primário para uma versão no formato coluna.

In [6]:
from pyspark.sql.functions import lit

df_full_name2 = df_filtered.withColumn(
    "fullname",
    concat(
        col("firstname"),
        lit(" "),
        col("middlename"),
        lit(" "),
        col("lastname")
    )
)

df_full_name2.show()

+---------+----------+--------+-----+---+------+----------------+
|firstname|middlename|lastname|   id|sex|salary|        fullname|
+---------+----------+--------+-----+---+------+----------------+
|    James|          |   Smith|36636|  M|  3000|    James  Smith|
|   Robert|          |Williams|42114|  M|  4000|Robert  Williams|
|    Maria|      Anne|   Jones|39192|  F|  4000|Maria Anne Jones|
+---------+----------+--------+-----+---+------+----------------+



### Validação se a coluna middlename está vazia (ficou 2 espaços em James Smith)
Para isso utilizamos CASE WHEN

In [7]:
df_filtered.createOrReplaceTempView("df")

df_full_name3 = spark.sql("""
    SELECT *,
    CASE
        WHEN middlename = '' THEN concat(firstname, " ", lastname) 
        ELSE concat(firstname, " ", middlename, " ", lastname)
    END AS fullname
    FROM df                     
    """)

df_full_name3.show()

+---------+----------+--------+-----+---+------+----------------+
|firstname|middlename|lastname|   id|sex|salary|        fullname|
+---------+----------+--------+-----+---+------+----------------+
|    James|          |   Smith|36636|  M|  3000|     James Smith|
|   Robert|          |Williams|42114|  M|  4000| Robert Williams|
|    Maria|      Anne|   Jones|39192|  F|  4000|Maria Anne Jones|
+---------+----------+--------+-----+---+------+----------------+



No código acima, criamos uma TempView, assim como no SQL, chamada df para podermos acessar via query.

### Código usando DataFrame APIs com mesmo resultado da consulta acima

In [8]:
from pyspark.sql.functions import when

df_full_name4 = df_filtered.withColumn(
    "fullname",
    when(
        col("middlename") == "",
        concat(
            col("firstname"),
            lit(" "),
            col("lastname")
        )
    ).otherwise(
        concat(
        col("firstname"),
        lit(" "),
        col("middlename"),
        lit(" "),
        col("lastname")
    )
    )
)

df_full_name4.show()

+---------+----------+--------+-----+---+------+----------------+
|firstname|middlename|lastname|   id|sex|salary|        fullname|
+---------+----------+--------+-----+---+------+----------------+
|    James|          |   Smith|36636|  M|  3000|     James Smith|
|   Robert|          |Williams|42114|  M|  4000| Robert Williams|
|    Maria|      Anne|   Jones|39192|  F|  4000|Maria Anne Jones|
+---------+----------+--------+-----+---+------+----------------+



### Conclusão: Escrever código utilizando as APIs da classe DataFrame torna mais fácil debugar o código em caso de erro e dividir o código em funções menores.