01 - Carregar Data

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/gabrielmarinho0812@gmail.com/marca_carro.csv")
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/gabrielmarinho0812@gmail.com/marcas_duplicadas.csv")
df3 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/gabrielmarinho0812@gmail.com/modelo_carro.csv")

df3.limit(10).display()

id_carro,modelo_carro,preco,cod_marca
1,Avalon,$78401.95,54
2,RDX,$95987.38,1
3,Golf,$61274.55,55
4,EX,$84981.12,23
5,Escort,$77466.89,17
6,Expedition,$84698.71,17
7,Voyager,$95567.75,42
8,Civic,$84749.22,20
9,Defender,$98600.79,29
10,V8 Vantage S,$94791.61,2


In [0]:
df1.write.format("csv").option("header", True).mode("overwrite").save("/teste/marca_carro")
df2.write.format("csv").option("header", True).mode("overwrite").save("/teste/marcas_duplicadas")
df3.write.format("csv").option("header", True).mode("overwrite").save("/teste/modelo_carro")

In [0]:
df1.limit(10).display()
df_marca = spark.read.format("csv").option("header", True).load("/teste/marca_carro")
df_marca.limit(10).display()

marca_carro,cod_marca
Acura,1
Aston Martin,2
Audi,3
Austin,4
BMW,5
Bentley,6
Bugatti,7
Buick,8
Cadillac,9
Chevrolet,10


marca_carro,cod_marca
Acura,1
Aston Martin,2
Audi,3
Austin,4
BMW,5
Bentley,6
Bugatti,7
Buick,8
Cadillac,9
Chevrolet,10


02 - Select e SQL


In [0]:
df_carros = spark.read.format("csv").option("header", True).load("/teste/modelo_carro")
df_carros.createOrReplaceTempView("carros")
df_marca.createOrReplaceTempView("marca")

df_carros = spark.sql("""
    select modelo_carro as Modelo, preco as Preco from carros
""")

df_carros.limit(10).display()

Modelo,Preco
Avalon,$78401.95
RDX,$95987.38
Golf,$61274.55
EX,$84981.12
Escort,$77466.89
Expedition,$84698.71
Voyager,$95567.75
Civic,$84749.22
Defender,$98600.79
V8 Vantage S,$94791.61


In [0]:
%sql

select modelo_carro as Modelo, preco as Preco, marca_carro as Marca from carros
inner join marca
on carros.cod_marca = marca.cod_marca
where id_carro <= 10

Modelo,Preco,Marca
Avalon,$78401.95,Toyota
RDX,$95987.38,Acura
Golf,$61274.55,Volkswagen
EX,$84981.12,Infiniti
Escort,$77466.89,Ford
Expedition,$84698.71,Ford
Voyager,$95567.75,Plymouth
Civic,$84749.22,Honda
Defender,$98600.79,Land Rover
V8 Vantage S,$94791.61,Aston Martin


03 - Filtrar Data

In [0]:
df_carros = spark.read.format("csv").option("header", True).load("/teste/modelo_carro")

display(
    df_carros.where("modelo_carro = 'Civic'")
    #df_carros.where(df_carros.modelo_carro == 'Civic')
    #df_carros.where(df_carros['modelo_carro'] == 'Civic')   
)


df_carros.where((col("modelo_carro") == "Truck") | (col("cod_marca") == 20)).limit(10).display()


id_carro,modelo_carro,preco,cod_marca
8,Civic,$84749.22,20
297,Civic,$96044.05,20
547,Civic,$92109.53,20


id_carro,modelo_carro,preco,cod_marca
8,Civic,$84749.22,20
23,Accord,$67419.94,20
40,Fit,$55448.35,20
75,del Sol,$80291.13,20
87,Truck,$57007.15,39
132,Prelude,$73902.58,20
140,Odyssey,$91804.80,20
155,Ridgeline,$51724.74,20
188,Fit,$99627.45,20
203,Truck,$90769.69,39


04 - Duplicatas e Replace

In [0]:
df_carros = df_carros.dropDuplicates()
df_carros = df_carros.withColumn("preco", regexp_replace('preco', "\$", ''))

df_carros.limit(10).display()

id_carro,modelo_carro,preco,cod_marca
3,Golf,61274.55,55
4,EX,84981.12,23
7,Voyager,95567.75,42
2,RDX,95987.38,1
8,Civic,84749.22,20
5,Escort,77466.89,17
1,Avalon,78401.95,54
6,Expedition,84698.71,17
9,Defender,98600.79,29
10,V8 Vantage S,94791.61,2


05 - Conversão de tipo

In [0]:
df_carros_sql = df_carros
df_carros_sql.createOrReplaceTempView("carros")

df_carros_sql = spark.sql("""

    SELECT
        cast(id_carro as INT) id_carro,
        modelo_carro,
        cast(preco as DOUBLE) preco,
        cast(cod_marca as INT) cod_marca
    FROM carros
""")

df_carros_sql.limit(10).display()
df_carros_sql.printSchema()

id_carro,modelo_carro,preco,cod_marca
3,Golf,61274.55,55
4,EX,84981.12,23
7,Voyager,95567.75,42
2,RDX,95987.38,1
8,Civic,84749.22,20
5,Escort,77466.89,17
1,Avalon,78401.95,54
6,Expedition,84698.71,17
9,Defender,98600.79,29
10,V8 Vantage S,94791.61,2


root
 |-- id_carro: integer (nullable = true)
 |-- modelo_carro: string (nullable = true)
 |-- preco: double (nullable = true)
 |-- cod_marca: integer (nullable = true)



In [0]:
df_carros_spark = df_carros

df_carros_spark = df_carros_spark.withColumn(
    "id_carro", col('id_carro').cast('int')
).withColumn(
    'preco', col('preco').cast('double')
).withColumn(
    'cod_marca', col('cod_marca').cast('int')
)

df_carros_spark.limit(10).display()
df_carros_spark.printSchema()

id_carro,modelo_carro,preco,cod_marca
3,Golf,61274.55,55
4,EX,84981.12,23
7,Voyager,95567.75,42
2,RDX,95987.38,1
8,Civic,84749.22,20
5,Escort,77466.89,17
1,Avalon,78401.95,54
6,Expedition,84698.71,17
9,Defender,98600.79,29
10,V8 Vantage S,94791.61,2


root
 |-- id_carro: integer (nullable = true)
 |-- modelo_carro: string (nullable = true)
 |-- preco: double (nullable = true)
 |-- cod_marca: integer (nullable = true)



07 - Like e Between

In [0]:
df_carros_like = df_carros
df_carros_between = df_carros

df_carros_like = df_carros_like.where(
    col("modelo_carro").like("%elud%")
)

df_carros_between = df_carros_between.where(
    col("preco").between(60000, 62000)
)

df_carros_like.limit(10).display()
df_carros_between.limit(10).display()

id_carro,modelo_carro,preco,cod_marca
257,Prelude,85968.61,20
318,Prelude,60673.19,20
132,Prelude,73902.58,20


id_carro,modelo_carro,preco,cod_marca
326,RAV4,61721.3,54
318,Prelude,60673.19,20
70,I,60877.56,23
3,Golf,61274.55,55
419,RX,60724.95,30
261,Bonneville,60844.14,43
135,Solara,60334.46,54
322,Metro,61533.51,19
325,Express 1500,61925.86,10
60,RL,60125.97,1


08 - Substring, RIGHT e LEFT

In [0]:
df_carros_spark = df_carros

df_carros_spark = df_carros_spark.withColumn(
    "modelo_sub", substring("modelo_carro", 2, 4)
).withColumn(
    "modelo_left", expr("LEFT(modelo_carro, 3)")
).withColumn(
    "modelo_right", expr("RIGHT(modelo_carro, 2)")
)

df_carros_spark.limit(10).display()

id_carro,modelo_carro,preco,cod_marca,modelo_sub,modelo_left,modelo_right
3,Golf,61274.55,55,olf,Gol,lf
4,EX,84981.12,23,X,EX,EX
7,Voyager,95567.75,42,oyag,Voy,er
2,RDX,95987.38,1,DX,RDX,DX
8,Civic,84749.22,20,ivic,Civ,ic
5,Escort,77466.89,17,scor,Esc,rt
1,Avalon,78401.95,54,valo,Ava,on
6,Expedition,84698.71,17,xped,Exp,on
9,Defender,98600.79,29,efen,Def,er
10,V8 Vantage S,94791.61,2,8 Va,V8,S


09 - Tipando Data


In [0]:
df_datas_1 = spark.createDataFrame(["2024-07-05T10:00:00.000+0000", "2023-12-05T00:09:00.000+0000", "2017-02-23T16:23:00.000+0000"], "string").toDF("datas")
df_datas_2 = spark.createDataFrame(["2021-07-05 10:00", "2020-12-05 00:09", "2024-02-23 16:23"], "string").toDF("datas")
df_datas_3 = spark.createDataFrame(["05/07/2021", "05/12/2020", "23/02/2017"], "string").toDF("datas")

display(df_datas_1)
display(df_datas_2)
display(df_datas_3)

datas
2024-07-05T10:00:00.000+0000
2023-12-05T00:09:00.000+0000
2017-02-23T16:23:00.000+0000


datas
2021-07-05 10:00
2020-12-05 00:09
2024-02-23 16:23


datas
05/07/2021
05/12/2020
23/02/2017


In [0]:
df_datas_1 = df_datas_1.withColumn("datas", to_timestamp("datas"))
display(df_datas_1)

df_datas_2 = df_datas_2.withColumn("datas", to_date("datas"))
display(df_datas_2)

df_datas_2 = df_datas_2.withColumn("datas", to_date("datas", "dd/MM/yyyy"))
display(df_datas_3)

datas
2024-07-05T10:00:00.000+0000
2023-12-05T00:09:00.000+0000
2017-02-23T16:23:00.000+0000


datas
2021-07-05
2020-12-05
2024-02-23


datas
05/07/2021
05/12/2020
23/02/2017


10 - Joins

In [0]:
df_carros_spark = df_carros.where(col("cod_marca") != "22").distinct()

df_carros_spark = df_carros_spark.join(
    df_marca,
    (df_carros_spark.cod_marca == df_marca.cod_marca),
    "inner"
)
df_carros_spark.limit(10).display()


df_carros_spark = df_carros.where(col("cod_marca") != "22").distinct()

df_carros_spark = df_carros_spark.join(
    df_marca,
    (df_carros_spark.cod_marca == df_marca.cod_marca),
    "left"
)
df_carros_spark.limit(10).display()


df_carros_spark = df_carros.where(col("cod_marca") != "22").distinct()

df_carros_spark = df_carros_spark.join(
    df_marca,
    (df_carros_spark.cod_marca == df_marca.cod_marca),
    "right"
).select(
    df_carros_spark["*"],
    df_marca.marca_carro
)




df_carros_spark.limit(10).display()


id_carro,modelo_carro,preco,cod_marca,marca_carro,cod_marca.1
258,350Z,66988.47,40,Nissan,40
315,Sunfire,90444.0,43,Pontiac,43
336,S8,82025.61,3,Audi,3
620,DeVille,90011.48,9,Cadillac,9
129,IPL G,52861.75,23,Infiniti,23
166,Aveo,45082.42,10,Chevrolet,10
256,A8,68321.65,3,Audi,3
394,5 Series,59419.97,5,BMW,5
924,S10 Blazer,61391.36,10,Chevrolet,10
348,Escort,74133.01,17,Ford,17


id_carro,modelo_carro,preco,cod_marca,marca_carro,cod_marca.1
5,Escort,77466.89,17,Ford,17
2,RDX,95987.38,1,Acura,1
1,Avalon,78401.95,54,Toyota,54
8,Civic,84749.22,20,Honda,20
9,Defender,98600.79,29,Land Rover,29
4,EX,84981.12,23,Infiniti,23
7,Voyager,95567.75,42,Plymouth,42
6,Expedition,84698.71,17,Ford,17
10,V8 Vantage S,94791.61,2,Aston Martin,2
3,Golf,61274.55,55,Volkswagen,55


id_carro,modelo_carro,preco,cod_marca,marca_carro
646,SLX,76518.37,1,Acura
852,Integra,54434.81,1,Acura
253,RSX,84861.4,1,Acura
638,Legend,47400.94,1,Acura
561,NSX,51341.45,1,Acura
343,SLX,67665.95,1,Acura
997,CL,81133.01,1,Acura
131,Integra,79011.63,1,Acura
60,RL,60125.97,1,Acura
601,NSX,63519.05,1,Acura


11 - Nulos

In [0]:
df_carros_spark = df_carros.where(
  col("cod_marca").isNotNull()
)

df_carros_spark.limit(10).display()

id_carro,modelo_carro,preco,cod_marca
3,Golf,61274.55,55
4,EX,84981.12,23
7,Voyager,95567.75,42
2,RDX,95987.38,1
8,Civic,84749.22,20
5,Escort,77466.89,17
1,Avalon,78401.95,54
6,Expedition,84698.71,17
9,Defender,98600.79,29
10,V8 Vantage S,94791.61,2


12 - Funcões de Agregação


In [0]:
df_carros_spark = df_carros.groupBy(
    "modelo_carro"
).agg(
    sum("preco").alias("Soma de Preco"),
    max("preco").alias("Preco MAX"),
    min("preco").alias("Preco MIN"),
    count("modelo_carro").alias("Quantidade")
)

df_carros_spark.limit(10).display()

modelo_carro,Soma de Preco,Preco MAX,Preco MIN,Quantidade
100,55023.26,55023.26,55023.26,1
1000,82354.14,82354.14,82354.14,1
1500,115833.18,70293.96,45539.22,2
1500 Club Coupe,73878.68,73878.68,73878.68,1
200SX,76179.86,76179.86,76179.86,1
240,52885.44,52885.44,52885.44,1
240SX,256826.05,94383.87,74588.41,3
2500,503800.18,94757.8,45557.06,7
2500 Club Coupe,68796.89,68796.89,68796.89,1
2CV,59625.57,59625.57,59625.57,1


Row Number

In [0]:
df_carros_spark = df_carros.withColumn(
    "row_number", row_number().over(Window.partitionBy("modelo_carro").orderBy("preco"))
)

df_carros_spark.limit(10).display()

id_carro,modelo_carro,preco,cod_marca,row_number
117,100,55023.26,3,1
71,1000,82354.14,43,1
634,1500,45539.22,18,1
57,1500,70293.96,10,2
882,1500 Club Coupe,73878.68,18,1
690,200SX,76179.86,40,1
171,240,52885.44,56,1
891,240SX,74588.41,40,1
136,240SX,87853.77,40,2
283,240SX,94383.87,40,3
