# Estudos Pyspark e SQL

## Pyspark session

In [0]:
# !pip install pyspark

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('DataFrame').getOrCreate()

In [0]:
spark

## Arquivos csv

In [0]:
# ler arquivo csv (option caso não reconheça o header na primeira linha)
# outra opção spark.read.option('header', 'true').csv('nome_arquivo.csv', inferSchema = True).show() (ou sem .show())
# outra opção spark.read.csv('nome_arquivo.csv', header, = True, inferSchema = True).show() (ou sem .show())

df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/arsnfreitas@gmail.com/marca_carro.csv")
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/arsnfreitas@gmail.com/marcas_duplicadas.csv")
df3 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/arsnfreitas@gmail.com/modelo_carro.csv")

In [0]:
display(df1.head(2))

In [0]:
# função display exibe os resultados
display(df3)

## Selecionando colunas

In [0]:
# exibir todas as colunas
df3.columns

In [0]:
# selecionar coluna
df3.select('modelo_carro')

In [0]:
# selecionar coluna e exibí-la (poderia usar o display())
df3.select('modelo_carro').show()

In [0]:
# selecionar colunas
display(df3.select(['id_carro', 'modelo_carro']))

In [0]:
# selecionar colunas
display(df3[['id_carro', 'modelo_carro']])

In [0]:
# describe() das variáveis
display(df3.describe())

## Adicionando, removendo e renomenado colunas

In [0]:
# adicionando coluna, removendo cifrão e alterando tipo da coluna preco
df_colunas = df3.withColumn('preco', regexp_replace('preco', '\$', '')
    ).withColumn('preco' , col('preco').cast('double')
    ).withColumn('preco_inflacionado', col('preco')*2)
                            
display(df_colunas)

In [0]:
# removendo coluna preco_inflacionado
df_colunas = df_colunas.drop('preco_inflacionado')
                            
display(df_colunas)

In [0]:
# renomeando colunas
# df3.columns = ['id_carro', 'modelo_carro', 'preco', 'cod_marca']
# df_colunas = df_colunas.withColumnRenamed(['id_carro', 'modelo_carro'], ['id', 'modelo'])
df_colunas = df_colunas.withColumnRenamed('id_carro', 'id'
                      ).withColumnRenamed('modelo_carro','modelo')

display(df_colunas)

## Salvando arquivos

In [0]:
# salvar arquivo na pasta indicada
# df3.write.format('csv').save('/estudos/modelo_carros')
# df1.write.format('csv').save('/estudos/marcas')
# df2.write.format('csv').save('/estudos/marcas_duplicadas')

# sobreescrever arquivo na pasta indicada
df3.write.format('csv').mode('overwrite').save('/estudos/modelo_carros')
df1.write.format('csv').mode('overwrite').save('/estudos/marcas')
df2.write.format('csv').mode('overwrite').save('/estudos/marcas_duplicadas')

In [0]:
# exibindo contagem de dados
print(df3.count())

# caso nao reconheça o cabeçalho, enconding ou delimitador use:
# spark.read.format('csv').option('header', True).option('enconding', 'utf-8').load('/estudos/modelo_carros', sep = ';')
df_carro = spark.read.format('csv').load('/estudos/modelo_carros')
df_carro.count()

In [0]:
# ler csv da pasta indicada
df_marcas_dup = spark.read.format('csv').load('/estudos/marcas_duplicadas')

# exibindo contagem de dados
print(df2.count())
print(df_marcas_dup.count())


# ler csv da pasta indicada
df_marcas_carro = spark.read.format('csv').load('/estudos/marcas')

# exibindo contagem de dados
print(df2.count())
print(df_marcas_carro.count())

In [0]:
df_carro
display(df_carro)

## Arquivos parquet e json

In [0]:
# salvar arquivo parquet na pasta indicada
# df_carro.write.format('parquet').save('/estudos/modelo_carros_parquet')
# df_marcas_carro.write.format('parquet').save('/estudos/marcas_parquet')
# df_marcas_dup.write.format('parquet').save('/estudos/marcas_duplicadas_parquet')

# sobrescrever arquivo parquet na pasta indicada
df_carro.write.format('parquet').mode('overwrite').save('/estudos/modelo_carros_parquet')
df_marcas_carro.write.format('parquet').mode('overwrite').save('/estudos/marcas_parquet')
df_marcas_dup.write.format('parquet').mode('overwrite').save('/estudos/marcas_duplicadas_parquet')

In [0]:
# salvar arquivo json na pasta indicada
# df_carro.write.format('json').save('/estudos/modelo_carros_json')
# df_marcas_carro.write.format('json').save('/estudos/marcas_json')
# df_marcas_dup.write.format('json').save('/estudos/marcas_duplicadas_json')

# sobrescrever arquivo json na pasta indicada
df_carro.write.format('json').mode('overwrite').save('/estudos/modelo_carros_json')
df_marcas_carro.write.format('json').mode('overwrite').save('/estudos/marcas_json')
df_marcas_dup.write.format('json').mode('overwrite').save('/estudos/marcas_duplicadas_json')

In [0]:
# ler arquivo parquet na pasta indicada
df_marcas_carro = spark.read.format('parquet').load('/estudos/marcas_parquet')
display(df_marcas_carro)

In [0]:
# ler arquivo json na pasta indicada
df_carros = spark.read.format('json').load('/estudos/modelo_carros_json')
display(df_carros)

## Comando SELECT SQL

In [0]:
# Criar tabela temporária para SQL
df_carros.createOrReplaceTempView('carros')

- %sql abre um ambiente para query SQL

In [0]:

%sql
select * from carros

In [0]:
%sql
select _c0 as id, _c1 as carros, _c2 as valor from carros

In [0]:
# spark.sql cria um dataframe para a query
df_carros_sql = spark.sql('''
                            select _c0 as id, _c1 as carros, _c2 as valor from carros
                          ''')

display(df_carros_sql)

## Comandos SELECT SPARK

In [0]:
# .select faz a query em pyspark
df_carros_spark = df_carros_sql.select('id', 'carros')
display(df_carros_spark)

In [0]:
# .selectExpr faz a query e permite alterar os nomes das colunas
df_carros_spark = df_carros.selectExpr('_c0 as id_carro', '_c1 as modelo_carros')
display(df_carros_spark)

In [0]:
# .alias também pode ser usado para alterar o nome da coluna
from pyspark.sql.functions import col
df_carros_spark = df_carros.select(col('_c0').alias('id_carro'), '_c1')
display(df_carros_spark)

## Comandos de filtro

In [0]:
# ler arquivo csv
df_carros = df_carro = spark.read.format('csv').load('/estudos/modelo_carros')

# criar tabela temprária
df_carros.createOrReplaceTempView('carros')

In [0]:
%sql

select * from carros
order by _c1

In [0]:
%sql

select * from carros
where _c0 = 1 
or _c0 = 2


In [0]:
df_carros_01 = spark.sql('''
                         select * from carros
                         where _c0 = 1 
                         or _c0 = 2
                         ''')

display(df_carros_01)

In [0]:
# Where pode ser usado para filtrar o dataframe
display(df_carros.where("_c0 = '1' or _c0 = '2'"))

In [0]:
# Filter pode ser usado para filtrar o dataframe
display(df_carros.filter("_c0 = '1' or _c0 = '2'"))

In [0]:
# Filter pode ser usado para filtrar o dataframe em várias colunas
display(df_carros.filter(
    (col('_c0') == '1')  |
    (col('_c1') == 'Golf')
    ))

In [0]:
# Filter pode ser usado para filtrar o dataframe em várias colunas
display(df_carros.where(
    (df_carros._c0 == '1')  |
    (df_carros['_c1'] == 'Golf')
    ))

## Criar Dataframe

In [0]:
# Filter pode ser usado para filtrar o dataframe em várias colunas
df_carro_pyspark = df_carros.where(
    (df_carros._c0 == '1')  |
    (df_carros['_c1'] == 'Golf')
)

display(df_carro_pyspark)

## Duplicados SQL

In [0]:
 # lendo arquivo json
 df_carros = spark.read.format('json').load('/estudos/modelo_carros_json')
 display(df_carros)

In [0]:
# criar daraframe pela query SQL com distinct para valores únicos da variável
df_carros_unicos = spark.sql('''
                         select distinct _c0 as id from carros
                         ''')

display(df_carros_unicos)

## Replace SQL

- replace(coluna, caracter a ser substituido, caracter para substituir)

In [0]:
%sql

select replace(_c2, '$', '#') as preco_num
from carros

In [0]:
%sql

select replace(_c2, '$', '') as preco_num
from carros

In [0]:
df_carros_replace = spark.sql('''
                         select replace(_c2, '$', '') as preco_num from carros
                         ''')

print(df_carros_replace.count())
display(df_carros_replace)

## Duplicados pyspark

In [0]:
# .dinstinct() para valores únicos da variável
df_carros_pyspark = df_carros.distinct()
print(df_carros_replace.count())
print(df_carros_pyspark.count())
display(df_carros_pyspark)

In [0]:
# .dropDuplicates() para remover duplicados da variável
df_carros_pyspark = df_carros.dropDuplicates()
print(df_carros_replace.count())
print(df_carros_pyspark.count())
display(df_carros_pyspark)

## Replace spark

In [0]:
from pyspark.sql.functions import regexp_replace

# withColumns para indicar qual coluna para realizar a operação - withColumns('coluna', operação)
# regex_replace(coluna, \ + caracter a ser substituido, caracter para substituir)  *** caracter a ser substituido deve ter a '\' antes para que seja identificado
df_carros_pyspark_2 = df_carros
df_carros_pyspark_2 = df_carros_pyspark_2.withColumn('_c2', regexp_replace('_c2', '\$', '#'))

display(df_carros_pyspark_2)

In [0]:
# withColumns para indicar qual coluna para realizar a operação - withColumns('coluna', operação)
# regex_replace(coluna, \ + caracter a ser substituido, caracter para substituir)  *** caracter a ser substituido deve ter a '\' antes para que seja identificado
df_carros_pyspark_3 = df_carros.withColumn('_c2', regexp_replace('_c2', '\$', ''))
display(df_carros_pyspark_3)

## Tipos dados SQL

In [0]:
from pyspark.sql.functions import *

df_carros_pyspark_2.createOrReplaceTempView('carros')

- cast(coluna as tipo de dados) nome coluna - Para indicar qual tipo de dado para a coluna

In [0]:
%sql

select 
  CAST(_c0 AS INT) ID,
  _c1 AS modelo,
  CAST(_c2 AS DOUBLE) preco,
  CAST(_c3 AS INT) marca
from carros

In [0]:
df_carros_sql = spark.sql('''
select 
  CAST(_c0 AS INT) ID,
  _c1 AS modelo,
  CAST(_c2 AS DOUBLE) preco,
  CAST(_c3 AS INT) marca
from carros
''' )

display(df_carros_sql)

In [0]:
# .printSchema() para exibir os tipos dos dados das colunas
df_carros.printSchema()

In [0]:
# .printSchema() para exibir os tipos dos dados das colunas
df_carros_sql.printSchema()

In [0]:
df_carros_sql.dtypes

## Tipos dados pyspark

In [0]:
# encadeia colunas: withColumns('coluna', col('coluna').cast('tipo do dado')).withColumns('coluna', col('coluna').cast('tipo do dado'))
df_carros_spark = df_carros
df_carros_spark = df_carros_spark.withColumn(
            '_c0' , col('_c0').cast('int')

).withColumn(
         '_c2' , col('_c2').cast('double')
)

In [0]:
from pyspark.sql.types import *

# outra maneira para alterar tipo de dados:
# .select(col('coluna).cast(tipo do dados))
df_carros_spark_2 = df_carros
df_carros_spark_2 = df_carros_spark_2.select(
           col('_c0').cast(IntegerType()),
           '_c1',
           df_carros_spark_2['_c2'].cast(DoubleType()),
           df_carros_spark_2._c2.cast('int')
)

## Like e between SQL

In [0]:
df_carros_spark_2.createOrReplaceGlobalTempView('carros')

- Contém as letras após %: like %abs
- Contém as letras antes %: like abs%
- Contém as letras entre os %: like %abs%

In [0]:
%sql

select * from carros
where _c1 like '%alo%'

In [0]:
%sql

select * from carros
where _c1 like '%rt'

In [0]:
%sql

select * from carros
where _c1 like 'Go%'

In [0]:
df_carros_sql_like = spark.sql('''
select * from carros
where _c1 like 'Go%'
                               ''')

display(df_carros_sql_like)

- Contém dos dados entre x e y: where coluna between x and y

In [0]:
%sql

select * from carros
where _c2 between 75000 and 77000

In [0]:
df_carros_sql_between = spark.sql('''
select * from carros
where _c2 between 75000 and 77000
 ''')

display(df_carros_sql_between)

## Like e between pyspark

In [0]:
 # Ler formato json e retirar $ da coluna _c2
 df_carros = spark.read.format('json').load('/estudos/modelo_carros_json')
 df_carros  = df_carros.withColumn('_c2', regexp_replace('_c2', '\$', ''))
 display(df_carros)

In [0]:
# Contém as letras após %: where(coluna.like(' %abs')
df_carros_spark = df_carros
df_carros_spark = df_carros_spark.where(
  col('_c1').like('%alo%')
)
display(df_carros_spark)

In [0]:
# Contém as letras antes  %: where(coluna.like(' abs%')
df_carros_spark = df_carros
df_carros_spark = df_carros_spark.where(
  df_carros_spark['_c1'].like('Go%')
)
display(df_carros_spark)

In [0]:
# # Contém as letras entre x e y: where(coluna.between(x, y)
df_carros_spark_4 = df_carros.where(
  col('_c2').between(75000,77000)
)
display(df_carros_spark_4)

## Funções substring, right e left SQL

In [0]:
df_carros.createOrReplaceGlobalTempView('carros')

In [0]:

display(df_carros)

- substring(coluna, posição inicial, nº de caracteres)
- left(coluna, nº de caracteres)  (da esquerda para direita)
- right(coluna, nº de caracteres)  (da direita para esquerda)

In [0]:
%sql

select 
_c1 as modelos,
substring(_c1, 2, 3) as letras,
left(_c1, 2) as letras_esquerda,
right(_c1, 2) as letras_esquerda
from carros

In [0]:
df_spark_letras = spark.sql('''
select 
_c1 as modelos,
substring(_c1, 2, 3) as letras,
left(_c1, 2) as letras_esquerda,
right(_c1, 2) as letras_esquerda
from carros           
                            ''')

display(df_spark_letras)

## Funções substring, right e left pyspark

df.withColumns('coluna', substring('coluna', posição inicial, nº de caracteres))

In [0]:
# df.withColumns('coluna', substring('coluna', posição inicial, nº de caracteres))
df_carros_pyspark = df_carros
df_carros_pyspark = df_carros_pyspark.withColumn('_c1', substring('_c1', 2, 3))
display(df_carros_pyspark)

In [0]:
# substring com right ou left
# df.withColumns('coluna', substring('coluna', posição inicial, nº de caracteres)).withColumns('coluna', expr('RIGHT("coluna", posição inicial)'))
df_carros_pyspark2 = df_carros
df_carros_pyspark2 = df_carros_pyspark2.withColumn(
'letras', substring('_c1', 2, 3)
).withColumn(
'letras_direita', expr('RIGHT(_c1, 2)')
).withColumn(
'letras_esquerda', expr('left(_c1, 2)'))
              
display(df_carros_pyspark2)

## Tipando data e hora SQL

In [0]:
# Criar dataframe de datas como string com .toDF(novo_nome_coluna)
df_datas_1 = spark.createDataFrame(['2021-07-05T10:00:00.000+0000', '2020-12-05T00:09:00.000+0000', '2017-02-23T16:23:00.000+0000'], 'string').toDF('datas')
df_datas_2 = spark.createDataFrame(['2021-07-05 10:00', '2020-12-05 00:09', '2017-02-23 16:23'], 'string').toDF('datas')
df_datas_3 = spark.createDataFrame(['05/07/2021', '05/12/2020', '23/08/207'], 'string').toDF('datas')

display(df_datas_1)
display(df_datas_2)
display(df_datas_3)

In [0]:
df_datas_1.createOrReplaceTempView('datas_1')
df_datas_2.createOrReplaceTempView('datas_2')
df_datas_3.createOrReplaceTempView('datas_3')

- Quando a data está em data + hora

In [0]:
%sql

select 
to_timestamp(datas) datas
from datas_1

In [0]:
df_datas_1_tipado = spark.sql('''
select 
to_timestamp(datas) datas
from datas_1                            
                              ''')

display(df_datas_1_tipado)

- Para manter somente a data quando há data + hora

In [0]:
%sql

select 
cast(datas as date) datas
from datas_2

In [0]:
df_datas_2_tipado = spark.sql('''
select 
cast(datas as date) datas
from datas_2                            
                              ''')

display(df_datas_2_tipado)

In [0]:
%sql

select 
to_timestamp(datas, 'yyyy-MM-dd HH:mm') datas
from datas_2

In [0]:
df_datas_22_tipado = spark.sql('''
select 
    to_timestamp(datas, 'yyyy-MM-dd HH:mm') datas
from datas_2                      
''')

display(df_datas_22_tipado)

- Data no formato brasileiro

In [0]:
%sql

select
    TO_DATE(datas, 'd/MM/y') datas
from datas_3

In [0]:
df_datas_3_tipado = spark.sql('''
select
     TO_DATE(datas, 'd/MM/y') datas
from datas_3                           
''')

display(df_datas_3_tipado)

## Tipando datas pyspark

In [0]:
# tipando com to_date('coluna')
df_datas_1_spark = df_datas_1
df_datas_1_spark = df_datas_1_spark.withColumn('datas', to_date('datas'))

display(df_datas_1_spark)

In [0]:
# tipando com to_timestamp('coluna')
df_datas_2_spark = df_datas_2
df_datas_2_spark = df_datas_2_spark.withColumn('datas', to_timestamp('datas'))

display(df_datas_2_spark)

In [0]:
# tipando com to_timestamp('coluna')
df_datas_22_spark = df_datas_2
df_datas_22_spark = df_datas_22_spark.withColumn('datas', to_timestamp('datas', 'yyyy-MM-dd HH:mm'))

display(df_datas_22_spark)

In [0]:
# tipando com to_timestamp('coluna') com formato brasileiro
df_datas_3_spark = df_datas_3
df_datas_3_spark = df_datas_3_spark.withColumn('datas', to_date('datas', 'd/MM/y'))

display(df_datas_3_spark)

## Iner, right, left join SQL

In [0]:
# importando dados csv
df_carros = spark.read.format('csv').load('/estudos/modelo_carros')
df_marcas = spark.read.format('parquet').load('/estudos/marcas_parquet').toDF('marca', 'cod_marca')

# Removendo duplicados e com id = 22 para puxar os dados com join
print(df_carros.count())
df_carros = df_carros.where(col('_c3') != '22').distinct()
print(df_carros.count())
display(df_carros)
display(df_marcas)

In [0]:
# Criando tabelas temporárias
df_carros.createOrReplaceTempView('carros')
df_marcas.createOrReplaceTempView('marcas')

- Inner join para trazer as marcas da tabela marcas

In [0]:
%sql

select c._c1 as modelos, c._c3 as cod_marca_carro, m.marca, m.cod_marca
from carros c
inner join marcas m
on c._c3 = m.cod_marca

In [0]:
df_carros_innerjoin = spark.sql('''
    select c._c1 as modelos, c._c3 as cod_marca_carro, m.marca, m.cod_marca
    from carros c
    inner join marcas m
    on c._c3 = m.cod_marca
''')

display(df_carros_innerjoin)

- left join para trazer as marcas faltantes cod_marca = 22 com null na tabela carros

In [0]:
%sql

select
m.*, c._c1
from marcas m
left join carros c
on m.cod_marca = c._c3

In [0]:
df_carros_leftjoin = spark.sql('''
    select
    m.*, c._c1
    from marcas m
    left join carros c
    on m.cod_marca = c._c3
''')

display(df_carros_leftjoin)

- right join para trazer as marcas faltantes cod_marca = 22 com null na tabela marcas

In [0]:
%sql

select
m.*, c._c1
from carros c
right join marcas m
on m.cod_marca = c._c3

In [0]:
df_carros_rightjoin = spark.sql('''
    select
    m.*, c._c1
    from carros c
    right join marcas m
    on m.cod_marca = c._c3
''')

display(df_carros_rightjoin)

## Inner, right, left join pyspark

In [0]:
# inner join df.join(df a unir, colunas iguai para join, 'inner')

df_spark_innerjoin = df_carros.join(
  df_marcas,
  (df_carros._c3 == df_marcas.cod_marca),
  'inner'
)

display(df_spark_innerjoin)

In [0]:
# right join df.join(df a unir, colunas iguai para join, 'right') com filtro nos dados

df_spark_rightjoin = df_carros['_c1', '_c3'].join(
  df_marcas,
  (df_carros._c3 == df_marcas.cod_marca),
  'right'
).select(
  df_marcas.cod_marca,
  df_marcas.marca,
  df_carros._c1
)

display(df_spark_rightjoin.filter((col('cod_marca') == '22') |
                                  (col('cod_marca') == '23')))

In [0]:
# left join df.join(df a unir, colunas iguai para join, 'left') com filtro nos dados

df_spark_leftjoin = df_marcas.join(
  df_carros,
  (df_carros._c3 == df_marcas.cod_marca),
  'left'
).select(
  df_marcas.marca,
  df_carros._c1,
  df_carros._c3
)

display(df_spark_leftjoin.filter((col('cod_marca') == '21') |
                                  (col('cod_marca') == '22')))

## Funções Exists e Left semi join SQL

In [0]:
# importando dados csv
df_carros = spark.read.format('csv').load('/estudos/modelo_carros').distinct()

# criando bases para comparação
df_carros_source = df_carros.where(
    (col('_c0') == '1') |
    (col('_c0') == '2') | 
    (col('_c0') == '3')
)

df_carros_final = df_carros.where(
    (col('_c0') == '1') |
    (col('_c0') == '2') |
    (col('_c0') == '3') |
    (col('_c0') == '4')
)

display(df_carros_final)

In [0]:
# Criar tabela temporária para SQL
df_carros_source.createOrReplaceTempView('source')
df_carros_final.createOrReplaceTempView('final')

- Retornar todos os dados de uma tabela que existem em outra tabela

In [0]:
%sql

select * 
from final f
where exists(
  select * from source s 
  where s._c0 = f._c0
)

In [0]:
df_carros_exists = spark.sql('''
select * 
from final f
where exists(
  select * from source s 
  where s._c0 = f._c0
)
''')
display(df_carros_exists)

## Funções Exists e Left semi join pyspark

In [0]:
# left semi join é equivalente ao exists do sql
# df.join(df a unir, colunas iguai para join, 'leftsemi')
df_carros_spark_leftsemi = df_carros_final.join(
    df_carros_source,
    df_carros_final._c0 == df_carros_source._c0,
    'leftsemi'
)
display(df_carros_spark_leftsemi)

## Funcções Not exists e left anti join SQL


- Retornar todos os dados de uma tabela que não existem em outra tabela

In [0]:
%sql
select * 
from final f
where not exists(
  select * from source s 
  where s._c0 = f._c0
)

In [0]:
df_carros_not_exists = spark.sql('''
select * 
from final f
where not exists(
  select 1 from source s 
  where s._c0 = f._c0
)
''')
display(df_carros_not_exists)

## Funcções Not exists e left anti join pyspark

In [0]:
# left semi join é equivalente ao exists do sql
# df.join(df a unir, colunas iguai para join, 'leftanti')
df_carros_spark_leftantisemi = df_carros_final.join(
    df_carros_source,
    df_carros_final._c0 == df_carros_source._c0,
    'leftanti'
)
display(df_carros_spark_leftantisemi)

## Operador Not e filtrando dados nulos SQL

In [0]:
# importando dados
df_carros = spark.read.format('csv').load('/estudos/modelo_carros').distinct()

df_carros_2 = df_carros.where(
    (col('_c0') == '1') |
    (col('_c0') == '2') | 
    (col('_c0') == '3')
)

# Adicionando coluna _c2 como None
df_carros_3 = df_carros.where(
   (col('_c0') == '4')
).withColumn('_c2', lit(None))

# unido dataframes
df_carros = df_carros_2.union(df_carros_3)

display(df_carros)

In [0]:
df_carros.createOrReplaceTempView('carros')

In [0]:
%sql

select
*
from carros
where _c2 is null

In [0]:
%sql

select
*
from carros
where _c2 is not null

In [0]:
df_carros_sql = spark.sql('''
    select
    *
    from carros
    where _c2 is not null                
                          ''')

display(df_carros_sql)

## Operador Not e filtrando dados nulos pyspark

In [0]:
# contagem de nulos
df_carros = df3
df_null = {col:df_carros.filter(df_carros[col].isNull()).count() for col in df_carros.columns}

display(df_null)     

In [0]:
# contagem de nulos
df_null2 = df_carros.select([count(when(isnan(c), c)).alias(c) for c in df_carros.columns])
display(df_null2)

In [0]:
# retornar todos os dados que  são nulos
df_carros_spark = df_carros.where(
  col('_c2').isNull()
)
display(df_carros_spark)

In [0]:
# retornar todos os dados que não são nulos
df_carros_spark = df_carros.where(
  col('_c2').isNotNull()
)
display(df_carros_spark)

In [0]:
# retornar todos os dados que não são nulos
df_carros_spark = df_carros.where(
  ~col('_c2').isNull()
)
display(df_carros_spark)

## Funções de agregação SQL

In [0]:
# importando dados
df_carros = spark.read.format('csv').load('/estudos/modelo_carros')

# Retirando símbolo $ e tipando como double
df_carros = df_carros.withColumn('_c2', regexp_replace('_c2', '\$',''
                                 ).cast(DoubleType())
)

# criando tabela sql temporária
df_carros.createOrReplaceTempView('carros')
display(df_carros)

In [0]:
%sql

select _c1 as modelos, sum(_c2) as sum_precos, max(_c2) as max_preco, min(_c2) as min_preco
from carros
group by _c1
order by max_preco desc

In [0]:
df_groupby_sql = spark.sql('''
    select _c1 as modelos, sum(_c2) as sum_precos, max(_c2) as max_preco, min(_c2) as min_preco
    from carros
    group by _c1
    order by max_preco desc                           
''')
display(df_groupby_sql)

## Funções de agregação pyspark

In [0]:
# Agrupando dados e ordenando com orderBy()
df_groupby_pyspark = df_carros.groupby(
  '_c1',
).agg(sum('_c2').alias('sum_preco'), 
      max('_c2').alias('max_preco'), 
      min('_c2').alias('min_preco')
).orderBy(col('max_preco').desc())

display(df_groupby_pyspark)

In [0]:
# Agrupando dados e ordenando com sort()
df_groupby_pyspark = df_carros.groupby(
  '_c1',
).agg(sum('_c2').alias('sum_preco'), 
      max('_c2').alias('max_preco'), 
      min('_c2').alias('min_preco')
).sort(col('max_preco').desc())

display(df_groupby_pyspark)

## Função row number sql

In [0]:
 # Ler formato json e retirar $ da coluna _c2
 df_carros = spark.read.format('json').load('/estudos/modelo_carros_json')
 df_carros  = df_carros.withColumn('_c2', regexp_replace('_c2', '\$', '')
                                   ).cast(DoubleType())
 display(df_carros)

In [0]:
df_carros.createOrReplaceGlobalTempView('carros')

- row_number(): retorna o número sequencial de uma linha em uma partição de um conjunto de resultados
Portanto, na query a baixo, se houver dois carros com preços diferentes, cada uma terá um índice diferente

In [0]:
%sql
select *,
row_number() over (partition by _c1 order by _c2 desc) row_number
from carros

## Função row number pyspark

In [0]:
from pyspark.sql.window import Window

# row_number(): retorna o número sequencial de uma linha em uma partição de um conjunto de resultados.
#  Portanto, no código spark abaixo, se houver dois carros com preços diferentes, cada uma terá um índice diferente
df_carros_spark = df_carros.withColumn(
  'row_number',
  row_number().over(
    Window.partitionBy('_c1').orderBy(col('_c2').desc())
  )
)

display(df_carros_spark)