In [None]:
!pip install pyspark==3.3.2
!pip install numpy==1.26.4
!pip install pandas==1.5.3
!pip install pyarrow

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
  SparkSession.builder.appName("MeuApp") 
  .config('spark.sql.repl.eagerEval.enabled', True) # faz com que o dataframe seja exibido com html no jupyter
  .config("spark.sql.execution.arrow.pyspark.enabled", "true") # pro pandas funcionar
  .getOrCreate())

In [None]:
# ler arquivo csv
df = spark.read.csv("./dados-curso/LOGINS.csv", header=True, sep=";")

# ler arquivo txt
df = spark.read.csv("./dados-curso/LOGINS.txt", header=True)

# ler o arquivo parquet
df = spark.read.parquet("./dados-curso/LOGINS.parquet")

In [None]:
df.printSchema()
df.count()
df.describe()
print(df.columns)
df.dtypes

In [None]:
from pyspark.sql.functions import col

# select
df.select('email', 'senha')
df.select(df.cpf, df.data_de_nascimento)
df.select(df['estado'], df['ipv4'])
df.select(col('data_cadastro'), F.col('cpf'))
df.select('*')

In [None]:
# drop
df.drop('email')

In [None]:
#filter
df.filter(df.estado == 'MG')
df.filter("estado = 'MG'")

df.filter((df.estado == 'MG') & (df.cor_favorita == 'Azul'))
df.filter(df.estado == 'MG').filter(df.cor_favorita == 'Azul')

df.filter((df.estado == 'MG') | (df.cor_favorita == 'Azul'))

df.filter(df.cor_favorita.isin('Azul', 'Verde') & (df.estado == 'MG'))
df.where(df.cor_favorita.isin('Azul', 'Verde') & (df.estado == 'MG')) # mesma coisa que filter

In [None]:
# padrao recomendado pra filtros
# abre o parenteses pra suportar multiplas linhas sem o escape character
(
  df
  .filter(F.col('estado') == 'MG')
  .filter(F.col('cor_favorita') == 'Azul')
)

In [None]:
# criar colunas com withColumn
(
  df
  .withColumn('pais', F.lit('Brasil'))
  .withColumn('sigla_estado', F.col('estado'))
  .withColumn('num', F.lit(5))
)

(
  df
  .withColumn('nome_estados', F.when(df.estado == 'AC', 'Acre')
                              .when(df.estado == 'AL', 'Alagoas')
                              .when(df.estado == 'AP', 'Amapá')
                              .otherwise('Num sei')
  ).withColumn('flag_rosa', F.when(df.cor_favorita == 'Rosa', True).otherwise(False))
)

In [None]:
df = df.select('email', 'senha', 'estado', 'cor_favorita', 'profissao')

# funcoes de string
(
  df
  .withColumn('usuario', F.split(df.email, '@').getItem(0))
  .withColumn('provedor', F.split(df.email, '@').getItem(1))
  # ponto é um caracter reservado no pyspark. alem disso, para usar colunas que voce esta criando, é preciso usar o F.col()
  .withColumn('nome_provedor', F.split(F.col('provedor'), '\\.').getItem(0))
  .withColumn('concat', F.concat(df.profissao, df.cor_favorita))
  .withColumn('concat2', F.concat(df.profissao, F.lit(' - Brasil')))
  .withColumn('lower', F.lower(df.profissao))
  .withColumn('lower', F.upper(df.profissao))
  .withColumn('initcap', F.initcap(df.profissao))
)

(
  df
  .withColumn('substr', F.substring(df.cor_favorita, 0, 3))
  .withColumn('format_string', F.format_string('= %s %s', df.cor_favorita, F.col('substr')))
  .withColumn('instr', F.instr(df.email, '@'))
  .withColumn('len', F.length(df.email))
  .withColumn('repeat', F.repeat(df.estado, 5))
  .withColumn('trim', F.trim(df.profissao))
  .withColumn('lpad', F.lpad(df.cor_favorita, 10, '-'))
)

In [None]:
df = spark.read.parquet("./dados-curso/IMC.parquet")
#funcoes numericas
(
  df
  .withColumn('round', F.round(df.peso, 1))
  .withColumn('ceil', F.ceil(df.peso))
  .withColumn('floor', F.floor(df.peso))
  .withColumn('abs', F.abs(df.peso))
  .withColumn('pow', F.pow(df.peso, 2))
  .withColumn('sqrt', F.sqrt(F.col('pow')))
  .withColumn('IMC', F.round(F.pow(df.peso, 2) / df.altura, 1))
)

In [None]:
df = spark.read.parquet("./dados-curso/LOGINS.parquet").select('data_de_nascimento', 'data_cadastro')

# funcoes de data
(
  df
  .withColumn('add_months', F.add_months(df.data_cadastro, 1))
  .withColumn('add_months2', F.add_months(df.data_cadastro, -1))
  .withColumn('current_date', F.current_date())
  .withColumn('current_timestamp', F.current_timestamp())
  .withColumn('date_add', F.date_add(df.data_cadastro, 1))
  .withColumn('date_add2', F.date_add(df.data_cadastro, -1))
  .withColumn('date_format', F.date_format(df.data_cadastro, 'dd/MM/yyyy'))
  .withColumn('date_diff', F.datediff(F.current_date(), df.data_de_nascimento ))
  .withColumn('day_of_month', F.dayofmonth(df.data_cadastro))
  .withColumn('day_of_week', F.dayofweek(df.data_cadastro))
  .withColumn('weekofyear', F.weekofyear(df.data_cadastro))
  .withColumn('year', F.year(df.data_cadastro))
  .withColumn('month', F.month(df.data_cadastro))
)

(
  df
  .withColumn('last_day', F.last_day(df.data_cadastro)) # ultimo dia do mes
  .withColumn('next_day', F.next_day(df.data_cadastro, 'Mon'))
  .withColumn('months_between', F.months_between(F.current_date(), df.data_de_nascimento))
  .withColumn('make_date', F.make_date(F.lit(2023), F.lit(1), F.lit(1)))
  .withColumn('to_date', F.to_date(F.lit('2023-01-01'), 'yyyy-MM-dd'))
)



In [None]:
# order by
df = spark.read.parquet("./dados-curso/LOGINS.parquet")
(
  df
  .orderBy('email')
)

(
  df
  .orderBy(F.desc('email'), F.asc('estado'))
)

In [None]:
# group by
(
  df
  .groupBy(df.estado, 'cor_favorita').count()
  .groupBy(df.estado).sum('count')
)

df.groupBy(df.estado, 'cor_favorita').count().groupBy(df.estado).avg('count')
df.groupBy(df.estado, 'cor_favorita').count().groupBy(df.estado).min('count')
df.groupBy(F.year(df.data_de_nascimento)).count()

In [None]:
# aggregate
df = spark.read.parquet("./dados-curso/LOGINS.parquet")
df = df.withColumn('num',F.dayofmonth(df.data_de_nascimento))

df.groupBy(df.cor_favorita).agg(F.count('*')) # count sem o asterisco nao conta as linhas com valores nulos
df.groupBy(df.cor_favorita).agg(F.count('*'), F.sum(df.num), F.min(df.num), F.max(df.num), F.avg(df.num), F.stddev(df.num))
df.groupBy(df.cor_favorita).agg({'num': 'sum', '*': 'count'})

In [None]:
# join
autores = spark.read.parquet("./dados-curso/AUTORES.parquet")
livros = spark.read.parquet("./dados-curso/LIVROS.parquet")
clientes = spark.read.parquet("./dados-curso/CLIENTES.parquet")
compras = spark.read.parquet("./dados-curso/COMPRAS.parquet")

autores.join(livros, autores.id == livros.id, 'left') # inner, left, right, full, semi, anti
autores.join(livros, on='id', how='left') # inner, left, right, full, semi, anti
#autores.join(livros, on=['id', 'segunda_coluna'], how='left') # inner, left, right, full, semi, anti

compras.join(livros, compras.cd_livro==livros.id, 'inner')

# multiplas condicoes
cond = [compras.cd_livro == livros.id, livros.preco > 20, compras.cartao_bandeira == 'American Express']
cond = [(compras.cd_livro == livros.id) & (livros.preco > 20) | (compras.cartao_bandeira == 'American Express')]
compras.join(livros, cond, 'inner')

In [None]:
# alias
df = spark.read.parquet("./dados-curso/LOGINS.parquet")

df.groupBy(df.estado.alias('Estado')).agg(F.count('*').alias('qtd'))

df1= df.alias('teste')
df1.select(df1.cpf, 'teste.cpf', F.col('teste.cpf'))

livros = spark.read.parquet("./dados-curso/LIVROS.parquet").alias('livros')
compras = spark.read.parquet("./dados-curso/COMPRAS.parquet").alias('compras')

df2 = compras.join(livros, F.col('cd_livro') == F.col('livros.id'), 'inner')
df2.select('livros.*', compras.data)

In [None]:
# union all 
df = spark.read.parquet("./dados-curso/LIVROS.parquet").limit(5)
dup = df.unionByName(df)
display(dup)

# drop duplicates
display(dup.dropDuplicates())

df4 = spark.read.parquet("./dados-curso/LIVROS.parquet").where(F.col('numero_paginas').isin(461,209))
display(df4)

df4.drop_duplicates(['numero_paginas'])

In [None]:
# funcoes de coluna
df = spark.read.parquet("./dados-curso/LOGINS.parquet")

(
  df
  .withColumn('tel', F.regexp_replace('telefone', r'(\D)', ''))
  .orderBy(F.asc('tel'))
  .where(F.year('data_de_nascimento').between(2014,2015))
)

(
  df
  .select(df.email, df.estado)
  .where(F.col('email').contains('joao'))
  .where(F.col('email').endswith('.br'))
)

In [None]:
# rodar sql
display(df.where(F.col('estado').isin('PR', 'SP')).limit(5))

spark.sql('''
  SELECT *
  FROM {tabela}
  WHERE estado IN ('PR', 'SP')
  LIMIT(5)
''', tabela=df)

# ou

df.registerTempTable('tabela')
spark.sql('''
  SELECT *
  FROM tabela
  WHERE estado IN ('PR', 'SP')
  LIMIT(5)
''')

In [None]:
# pivot e unpivot
df = spark.read.parquet("./dados-curso/COMPRAS.parquet")

# transforma meses que sao janeiro e fevereiro em colunas
(
  df
  .withColumn('mes', F.date_format(df.data, "MMMM"))
  .groupBy(df.cartao_bandeira)
  .pivot('mes', ['January', 'February'])
  .agg(F.count('*'))
)
# pivot na coluna inteira
(
  df
  .withColumn('mes', F.date_format(df.data, "MMMM"))
  .groupBy(df.cartao_bandeira)
  .pivot('mes')
  .agg(F.count('*'))
)

# unpivot
df2 = (
  df
  .withColumn('mes', F.date_format(df.data, "MMMM"))
  .groupBy(df.cartao_bandeira)
  .pivot('mes', ['January', 'February'])
  .agg(F.count('*'))
)

(
  df2
  .select('cartao_bandeira', F.expr('stack(2, "Jan", January, "Fev", February) AS (mes, valor)'))
)

In [None]:
# exportando
df = spark.read.parquet("./dados-curso/LOGINS.parquet")
df.write.save('output', mode='overwrite', format='csv', partitionBy=['estado']) # nao funciona nesse ambiente. por padrao salva como parquet
df.write.mode('overwrite').csv('nomedoarquivo')
df.write.saveAsTable('db.nome_da_tabela' mode='overwrite')

In [None]:
# criar dataframe

dados = [
  {'nome': 'Dalton', 'Idade': 27},
  {'nome': 'Alice', 'Idade': 41},
  {'nome': 'Lara', 'Idade': 12}
]

spark.createDataFrame(dados)
# ou no método antigo (evitar)
spark.createDataFrame(dados, "nome: string, idade: int")

dados2 = [
  ('Dalton', 27),
  ('Alice', 41),
  ('Lara', 12)
]

spark.createDataFrame(dados2, ['nome', 'idade'])

# com schema

import pyspark.sql.types as T

schema = T.StructType([
  T.StructField('nome', T.StringType(), nullable=True)
  T.StructField('idade', T.IntegerType(), nullable=True)
])

spark.createDataFrame(dados2, schema)

# usando estrutura row

from pyspark.sql import Row

rdd = [
  Row('Dalton', 27),
  Row('Alice', 41)
]

spark.createDataFrame(rdd, '_1: string', '_2: int')

In [None]:
# spark para pandas
df = spark.read.parquet("./dados-curso/LOGINS.parquet")
#df.toPandas() # as vezes buga ou acaba  memoria do cluster

# melhor forma
import pandas as pd

df_pandas = pd.DataFrame(df.collect(), columns=df.columns)
df_pandas

# pandas para spark
import pyspark.pandas as ps

df_pypandas = ps.DataFrame(df_pandas)
df = df_pypandas.to_spark()
df

In [None]:
# user defined function
from pyspark.sql.functions import udf
df = spark.read.parquet("./dados-curso/LOGINS.parquet")

@udf(returnType=T.StringType())
def nome_estado(sigla): # sigla é uma coluna do pyspark
  if sigla == 'SP':
    return 'São Paulo'
  return 'Outros'

df.select(df.estado, nome_estado(df.estado))

In [26]:
# windows functions
from pyspark.sql.window import Window

autores = spark.read.parquet("./dados-curso/AUTORES.parquet").alias('autores')
livros = spark.read.parquet("./dados-curso/LIVROS.parquet").alias('livros')
compras = spark.read.parquet("./dados-curso/COMPRAS.parquet").alias('compras')

df = compras.join(livros, compras.cd_livro == livros.id).join(autores, livros.id == autores.id).drop('livros.id', 'autores.id')

window1 = Window.orderBy('compras.id') 
window2 = Window.partitionBy('cd_cliente').orderBy('data')
window3 = Window.partitionBy('autor').orderBy('data_lancamento')
window4 = Window.partitionBy('cd_cliente')

(
  df
  .withColumn('num_linha', F.row_number().over(window1))
  .withColumn('num_compra', F.row_number().over(window2))
  .drop_duplicates(['cd_livro', 'autor']) # pra não dar pau nessa window 3. xunxo
  .withColumn('num_livro', F.row_number().over(window3))
  .withColumn('total_acumulado_cliente', F.round(F.sum('preco').over(window4), 2)) # window 2 e 4 tem diferença de resultado!
)

id,cartao_data_expiracao,cartao_numero,cartao_bandeira,cartao_cvc,codigo_transacao_bancaria,data,hora,ipv4,ipv6,cep_entrega,cd_livro,cd_cliente,id.1,cnpj_editora,data_lancamento,ean,isbn10,numero_paginas,preco,id.2,titulo,autor,num_linha,num_compra,num_livro,total_acumulado_cliente
845072,02/25,180001948797147,VISA 16 digit,942,GB23IIEG688147135...,2023-03-11,18:13:33,152.251.231.89,3405:80ca:f878:9f...,47087406,60875372,1010444,60875372,51.639.042/0001-08,2016-08-14,5788041198904,0-647-31874-1,490,247.14,60875372,O Sonho da Câmara...,Cao Xueqin,15506,12,1,1904.53
162408,10/32,213113956445062,Diners Club / Car...,925,GB14AEWV654918131...,2021-06-16,11:33:27,89.251.166.47,4985:7a40:1618:e8...,79137018,36059407,1010444,36059407,97.230.418/0001-27,2021-01-20,5318278006040,1-4565-9536-9,527,222.98,36059407,As Histórias Comp...,Franz Kafka,2874,2,2,1904.53
184567,06/24,4447300963875411,VISA 16 digit,104,GB66BMUY634345621...,2022-03-29,05:18:04,199.105.161.129,5f35:515a:c8cb:3b...,74985-735,20414016,1010444,20414016,93.784.652/0001-45,2011-06-09,7121708142144,0-9779047-3-3,343,212.1,20414016,Vidas Secas,Graciliano Ramos,3335,7,1,1904.53
184365,02/29,3596916750973605,Mastercard,0,GB56HYJT915418140...,2020-10-31,13:44:40,189.124.137.181,d918:3c58:7886:b0...,41327-956,100520231,1010444,100520231,19.072.435/0001-70,2009-08-17,9569536353895,1-124-02272-4,239,35.55,100520231,Galáxias,Haroldo de Campos,3330,1,1,1904.53
561293,05/27,4044155743405187,Maestro,462,GB46XASR659792813...,2021-11-02,16:59:29,6.42.185.244,d0ce:e0de:d8e8:f1...,10034671,30144651,1010444,30144651,24.631.097/0001-07,2011-04-14,5139658472558,0-8390-6301-6,146,242.2,30144651,Zero,Ignácio De Loyola...,10118,5,1,1904.53
594608,06/26,180033694345318,VISA 16 digit,236,GB90YJQC721422122...,2021-10-11,07:38:54,109.94.247.129,b3b6:e461:6305:19...,02199495,30099528,1010444,30099528,28.103.975/0001-64,2000-02-06,9164658045347,0-904222-71-3,292,208.39,30099528,Retrato do Artist...,James Joyce,10824,4,1,1904.53
472081,05/24,4329111905286,JCB 15 digit,843,GB11ESYC978809959...,2022-06-19,15:19:37,33.133.249.145,4c06:dcb2:a759:7e...,33553602,26925428,1010444,26925428,97.156.423/0001-37,2011-09-04,7828100308177,0-491-15648-0,793,193.19,26925428,"Sing, Unburied, S...",Jesmyn Ward,8520,9,1,1904.53
27189,07/32,3584571820190963,Mastercard,665,GB05FFLE391061682...,2022-06-26,07:49:06,16.231.207.208,3e1a:7fff:21a9:f3...,36368406,12489208,1010444,12489208,76.035.284/0001-13,2011-10-30,5442868630558,1-927216-40-0,113,160.53,12489208,Triste Fim de Pol...,Lima Barreto,322,10,1,1904.53
26947,02/26,4280851171930570,VISA 16 digit,182,GB44ZIZZ959153753...,2022-05-04,21:03:31,69.102.13.3,d78d:4a7b:8dc:a09...,19716-926,36030824,1010444,36030824,08.513.946/0001-63,2005-01-26,1064959235400,0-634-13210-5,716,226.36,36030824,Tremor de Terra,Luiz Vilela,317,8,1,1904.53
629783,04/32,3574500512069120,JCB 16 digit,5986,GB50JGZS995303032...,2021-09-14,03:56:31,201.173.147.62,b79b:2d82:4c7c:a8...,91427-784,16581063,1010444,16581063,02.539.486/0001-66,2013-05-16,4572823461710,0-906264-22-7,546,145.25,16581063,Macunaíma – O Her...,Mário de Andrade,11521,3,1,1904.53
