### Sessão Spark
- Permite acessar o cluster spark.

.builder() - "Seta" a sessão.  
.getOrCreate() - Cria ou pega a sessão.  
.appName() Ajuda no gerenciamento de múltiplas sessões.

In [0]:
# Criando uma sessão com spark
from pyspark.sql import SparkSession
# Inicializando a sessão
spark = SparkSession.builder.appName("SparkSession").getOrCreate()

# Criando um dataframe
''' df = spark.read.csv("Workspace/Monthly_Transportation_Statistics.csv", header=True, inferSchema=True) '''
# Nesse caso o databricks não possui acesso para ler arquivos csv com o pyspark

# Mostrando o dataframe
''' display(df) '''


### Dataframes
- A maior diferença é a distribuição dos dados. O pandas possui uma única instância de computação enquanto o pyspark distribui os dados.

In [0]:
df = spark.sql("""
               select * 
               from workspace.bronze.b3_hist
               where datetime >= date_sub(current_date,5)
               """)

print(df.count()) # Contagem de linhas

display(df.limit(5)) # Mostrar tabela

df.printSchema() #Mostrar a estrutura do df

### Principais funções do pyspark:
.select()  
.filter() ou .where()  
.groupBy()  
.agg()  
.sort()  
.na.drop() --> Remoção de valores nulos

In [0]:
# Agrupamento e operações
display(df.groupBy('Ticker').agg({'close':'avg'}).limit(2))

display(df.filter(df['close']<5.0).select('ticker','close','datetime').limit(2))
''' 
Outras operações:
sum()
min()
max()
count() 
'''

In [0]:
# Lidando com dados faltando / valores nulos
from pyspark.sql.functions import col

df.na.drop() 
df.where(col('close').isNotNull())

df.na.fill({'close':0}) 

In [0]:
# Operações com colunas

# Renomear coluna
df.withColumnRenamed('close','fechamento')

# Adicionar uma coluna
df = df.withColumn("dff_close_open", df['close'] - df['open'])

# Removendo colunas 
df = df.drop('dif_close_open')

display(df.limit(5))

### Tipos de dados

In [0]:
from pyspark.sql.types import (StructType,
                               StructField, 
                               LongType, 
                               IntegerType,
                               StringType,
                               ArrayType
                            )

# Construção da estrutura do Dataframe
schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("close", LongType(), True),
    StructField("precos", ArrayType(IntegerType()), True)
])

dados = [
    ('AAPL', 1.4, [1,2,3]),
    ('GOOG', 25.4, [4,5,6])
    ]
df2 = spark.createDataFrame(dados, schema)

### Operações avançadas
- Join
- Union
- Arrays e Maps

In [0]:
# Join

''' 
df_join = df1.join(df2,on="id",how="inner")

df_join = df1.join(
    df2,
    df1.id == df2.row_id,
    "inner"
) 
'''

# Union
''' df_union = df1.union(df2) '''

In [0]:
# Trabalhando com arrays
from pyspark.sql.functions import array, struct, lit

df_array = df.withColumn("preco",
                         array(lit(85),
                               lit(90),
                               lit(78)
                               )
                         )
display(df_array.limit(5))

In [0]:
# Trabalhando com maps
from pyspark.sql.types import StructField, StructType,StringType, MapType 

schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("close", LongType(), True),
    StructField("precos", MapType(StringType(), LongType()), True)
])

In [0]:
# Trabalhando com structs
from pyspark.sql.functions import struct

df_map = df.withColumn("name_struct", struct("ticker", "datetime"))