In [None]:
#Base fictícia -> dados não existem

In [None]:
!pip install pyspark

In [None]:
#importacao de bibliotecas
import pandas as pd
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql import Row

from datetime import datetime,date

In [None]:
#configurando conexao
spark = SparkSession.builder.appName('Estudo-PySpark').getOrCreate()

In [None]:
#Para ler uma base:
base = spark.read.table('database.base')

In [None]:
#Para subir um arquivo:
base = pd.read_csv('base_teste.csv', sep = ";", header = True)

In [None]:
#Para ler de um bucket no S3

import boto3

bucket_inicio = 's3://bigdata-projeto/processados'
bucket_destino = 's3://bigdata-projeto/processados'

base = spark.read.parquet(bucket_inicio+'/')

In [None]:
#criando um data frame

colunas = ['id','nome_cliente','idade','inclusao_base']

dados = [(1,2,3,4),
         ('Marilu','Lucas','Luna','Bete'),
         (20, 18, 20, 33),
         (date(2021,9,10),date(2021,3,10),date(2021,9,10),date(2021,9,25))]

base = spark.createDataFrame(dados).DF(*colunas)
base.show()

In [None]:
#Verificando a Schema da tabela
base.printSchema()

In [None]:
#Para filtrar use Filter
#No caso a seguir, filtra-se registros com a data de hoje
#O f.col referencia a coluna
hoje = datetime.now().date()
base = base.filter(f.col('data') == hoje)

In [None]:
#Pode-se particionar a base, caso a base seja reprocessada com uma certa periodicidade

#Particionando a base pegando os dados mais recentes
win = Window.partitionBy(f.col('id_cliente')).orderBy(f.col('data_processamento').cast('date').desc())
base = base.withColumn('col_rank', f.row_number().over(win)).filter(('col_rank' == 1))

In [None]:
#Para criar uma nova coluna use WithColumn(nome da coluna, condição)
#Caso tenha que usar if, use f.when, o else é otherwise

base_feminina = base.withColumn('flag_genero',f.when(f.col('genero') == "M",'sim').otherwise('não'))\
                    .filter(f.col('flag_genero') == 'sim')

#Pode-se renomear o nome da coluna usando WithColumnRenamed
base_feminina2 = base.withColumnRenamed('genero','genero_cliente')

In [None]:
#Para selecionar colunas use Select
base_feminina.select('flag_genero','genero').show()

In [None]:
#Cruzamento entre tabelas
#duas tabelas: vendas e produtos

#Schema tb_vendas:
#        | id_venda | id_produto | quantidade | valor |
#          ----------------------------------------
#            int    |    int     |     int    | double
#          ----------------------------------------

#Schema tb_produtos:
#        | id_produto | valor  | nome_produto |
#          ----------------------------------
#             int     | double |    string    |
#          ----------------------------------

#o cruzamento será da tb_vendas com a tb_produtos para pegar nome do produto para cada venda, então será um left join
tb_vendas.join(tb_produtos, tb_vendas['id_produto'] == tb_produtos['id_produto'], 'left')

In [None]:
#Para armazenar os dados processados em um csv no S3

base.write.option("header", True).csv(bucket_destino+'/dados_processados.csv', sep=";")

#Para um diretorio
base.write.option("header", True).csv('diretorio_local')