In [None]:
import pyspark
import pyspark.sql.utils
from pyspark.sql import SparkSession
import pandas as pd

In [None]:
#criando sessão Spark
spark = SparkSession.builder.getOrCreate()

In [None]:
#criando um dataframe a partir de uma leitura de um csv em pandas
data = pd.read_csv("https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial=%2701-01-2019%27&@dataFinalCotacao=%2712-31-2025%27&$top=9000&$format=text/csv&$select=cotacaoCompra,cotacaoVenda,dataHoraCotacao")
df = spark.createDataFrame(data)

In [None]:
df.display() #visualiza o dataframe
df.printSchema() #verifica ostipos de dados do df

In [None]:
#lendo diferentes tipos de arquivos
dfs = spark.read.format("csv").option("sep",";").option("inferschema", True).option("header",True).load("path do arquivo")
dfp = spark.read.format("parquet").option("header",True).load("path do arquivo")
dfj = spark.read.format("json").option("header",True).load("path do arquivo")

In [None]:
#com withColumn podemos fazer várias tratativas de dados conforme abaixo:(renomear colunas, alterar tipo de dados, replace etc...)
df = dfs.withColumnRenamed("Item Type","Item_Type"
                     ).withColumnRenamed("Sales Channel","Sales_Channel"
                     ).withColumnRenamed("Order Priority", "Order_Priority"
                     ).withColumnRenamed("Order Date","Order_Date"
                     ).withColumn("Order_Date",to_date("Order_Date","dd/MM/yyyy")
                     ).withColumnRenamed("Order ID", "Order_ID"
                     ).withColumn("Order_ID",col("Order_ID").cast(IntegerType())
                     ).withColumnRenamed("Ship Date","Ship_Date"
                     ).withColumn("Ship_Date",to_date("Ship_Date","dd/MM/yyyy")
                     ).withColumnRenamed("Units Sold", "Units_Sold"
                     ).withColumn("Units_Sold",col("Units_Sold").cast(IntegerType())
                     ).withColumnRenamed("Unit Price", "Unit_Price"
                     ).withColumn("Unit_Price",regexp_replace("Unit_Price",",",".").cast("float")
                     ).withColumnRenamed("Unit Cost","Unit_Cost"
                     ).withColumn("Unit_Cost",regexp_replace("Unit_Cost",",",".").cast("float")
                     ).withColumnRenamed("Total Revenue", "Total_Revenue"
                     ).withColumn("Total_Revenue",regexp_replace("Total_Revenue",",",".").cast("float")
                     ).withColumnRenamed("Total Cost","Total_Cost"
                     ).withColumn("Total_Cost",regexp_replace("Total_Cost",",",".").cast("float")
                     ).withColumnRenamed("Total Profit", "Total_Profit"
                     ).withColumn("Total_Profit",regexp_replace("Total_Profit",",",".").cast("float"))

In [None]:
#unir dois df's em um com o comando "union"
df1
df2
df3 = d1.union(df2)
#logo o df3 estará com os dados do df1 e df2

In [None]:
#contando as linhas de um df
df.count()

In [None]:
#lendo arquivos do dbfs databricks
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/adones.inocencio@blueshift.com.br/marca_carro.csv")

In [None]:
#criando uma view sql
df_carros.createOrReplaceTempView("carros")

#criando um dataframe a partir de uma query sql
df_carros_sql = spark.sql("""select id_carro as id, modelo_carro as modelo from carros""")
display(df_carros_sql)

In [None]:
#Select com Spark
from pyspark.sql.functions import col
#df_carros_spark = df_carros.selectExpr("modelo_carro as modelo" , "id_carro as id") #nomeando as colunas com "as" e Expr depois do select
#nomeando com alias
df_carros_spark = df_carros.select(col("modelo_carro").alias("modelo"), col("id_carro").alias("id"))

In [None]:
#filtro com SQL
df_carros.createOrReplaceTempView("carros") #criando uma view sql

#dataframe com o resultado da consulta sql
df_carros_sql = spark.sql("""
select * from carros
where modelo_carro = "Avalon"
and id_carro = 1
or modelo_carro = "Golf"
""")

In [None]:
#Filtros usando pyspark

"""
#usando apenas where (pode usar o comando "filter" no lugar do where) 
display(
    df_carros.where("id_carro = 1")
)
"""
"""
#usando com o comando "col"
display(
    df_carros.where(
        col("id_carro") == "1")
)
"""
#usando & ou | (or)
display(
    df_carros.where(
        (col("id_carro") == "1") &
        (col("modelo_carro") == "Avalon"))
)

In [None]:
#criando um dataframe a partir da consulta

df_carro_pyspark = df_carros.where(df_carros["id_carro"] == "1")
display(df_carro_pyspark)

In [None]:
#valores distintos sql
%sql
select distinct * from carros3

In [None]:
#replace sql
%sql
select replace(preco, '$', '#') as preco from carros

In [None]:
#No pyspark para eliminar os duplicados pode usar tanto o distinct como o dropDuplicate, faz o mesmo trabalho
df_carrospyspark_3 = df_carros_3.distinct()
print(df_carrospyspark_3.count())

In [None]:
#Para substituir valores no pyspark usamos a função regexp_replace, passamos a coluna, o caractere a ser alterado (colocar a contrabarra \ antes do caractere, e depois o que você quer que seja colocado no lugar, nesse caso colocamos um espaço vazio, mas poderia ser R$)
from pyspark.sql.functions import regexp_replace

df_carrospyspark_4= df_carros_3
df_carrospyspark_4 = df_carrospyspark_4.withColumn("preco", regexp_replace("preco", "\$", ""))
display(df_carrospyspark_4)

In [None]:
#Fazendo tipagem de dados com withColumn e cast
df_carros_spark = df_carros_3
df_carros_spark = df_carros_spark.withColumn("id_carro", col("id_carro").cast("int")).withColumn("preco", col("preco").cast("double"))

In [None]:
#alterando o tipo de dado no pyspark com select...Você pode usar as funções de tipo também, IntergerType, DoubleType etc... porém tem que rodar o "from pyspark.sql.types import *"

from pyspark.sql.functions import *
df_carros_spark = df_carros_3 
df_carros_spark = df_carros_spark.select(
    col("id_carro").cast("int"),
    col("preco").cast("double"))

In [None]:
df_carros_6 = spark.read.format("csv").option("header", True).load("/mnt/adones-inocencio/modelo_carro.csv")

#substituindo o $ por um espaço vazio na coluna preço
df_carros_6 = df_carros_6.withColumn(
    "preco", regexp_replace(col("preco"), "\$", ""))

#alterando os tipos de dados das colunas
df_carros_6 = df_carros_6.select(
    col("id_carro").cast(IntegerType()),
    "modelo_carro", 
    col("preco").cast(DoubleType()),
    col("cod_marca").cast(IntegerType())

)

In [None]:
#criando uma tempview
df.createOrReplaceTempView("nova_view")

In [None]:
#No SQL usando o Like tudo que você coloca entre aspas e porcentagem ele puxa, ex: Se for uma lista de nomes e tiver 
#varios "Paulo" e você filtrar como "%ulo%" ele vai filtrar tudo que tem nesse intervalo. Se tirar o % do final ele vai puxar
#tudo que termina no intervalo que colocou, no caso tudo que acaba com "ulo", se você mtira a % do final e deixa só no inicio,
#ele filtra tudo que começa com o intervalo solicitado: "%aul%" -> Filtra o que tem dentro do intervalo "%pa" -> Filtra o que 
#começa com o intervalo "lo%" -> Filtra o que acaba com o intervalo

%sql
select *
from carros6
where modelo_carro like "Es%"

In [None]:
#usando between
%sql
select * 
from carros6
where preco between 50000 and 75000

In [None]:
#Usando o like no spark

df_carros6_spark = df_carros_6

df_carros6_spark = df_carros6_spark.where(
    col("modelo_carro").like("%alo%"))

In [None]:
#Usando o between no spark

df_carros7_spark = df_carros_6

df_carros7_spark = df_carros7_spark.where(
    col("preco").between(50000, 65000))