**Utilitários**

In [0]:
## https://docs.databricks.com/data/databricks-file-system.html?_ga=2.69751760.1755815213.1614473482-1952423036.1614199515#dbutils
#dbutils.help()
## apaga arquivo no DBFS
#dbutils.fs.rm("/FileStore/tables/arquivos_gravados/arquivo_rdd.txt")
## apaga diretório e seus sub e arquivos no DBFS
#dbutils.fs.rm("/FileStore/tables/arquivos_gravados/",True)
## lista o conteúdo no DBFS
display(dbutils.fs.ls("/FileStore/tables"))

path,name,size
dbfs:/FileStore/tables/Mall_Customers.csv,Mall_Customers.csv,4286
dbfs:/FileStore/tables/RAW_ZONE_ORC/,RAW_ZONE_ORC/,0
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/,RAW_ZONE_PARQUET/,0
dbfs:/FileStore/tables/adult_data.csv,adult_data.csv,5608318
dbfs:/FileStore/tables/arquivo_rdd.txt,arquivo_rdd.txt,355
dbfs:/FileStore/tables/cap_2/,cap_2/,0
dbfs:/FileStore/tables/country_vaccinations.csv,country_vaccinations.csv,880275
dbfs:/FileStore/tables/country_vaccinations.json,country_vaccinations.json,2742083
dbfs:/FileStore/tables/d1995_07_01_24d0c.json,d1995_07_01_24d0c.json,57242
dbfs:/FileStore/tables/d1995_07_02_24d0c.csv,d1995_07_02_24d0c.csv,48246


**Criando pipelines de dados eficientes - Parte 2 (Spark SQL e PySpark)**

**PRÁTICA**

In [0]:
# Path - dataset1
path_dataset1 = "/FileStore/tables/country_vaccinations.csv"

# Path - RDD
path_rdd = "/FileStore/tables/arquivo_rdd.txt"

In [0]:
# Leitura de Dataframe CSV

## Opção 1
# Lê arquivo no formato .csv, primeira linha sendo cabeçalho, sem inferir os tipos de dados e carrega 
#df1 = spark.read.format("csv").option("header","true").option("inferSchema","false").load(path_dataset1)

## Opção 2
# df1 = spark.read.csv(path_dataset1)
# Lê arquivo no formato .csv, com separado ", ", primeira linha sendo cabeçalho, inferir os tipos de dados e carrega
df1 = spark.read.option("header","true").option("sep",",").option("inferSchema","true").csv(path_dataset1)

## Exibindo dataframe
df1.show(5)
# mostra os nomes das colunas
# df1.columns
# mostra as colunas com os tipos de dados
# df1.dtypes

In [0]:
# Ingestão de arquivo PARQUET em RAW ZONE
df1.write.format("parquet").mode("overwrite").save("/FileStore/tables/RAW_ZONE_PARQUET/")
# divide o arquivo em 6 (repartition divide mais)
#df1.repartition(4).write.format("parquet").mode("overwrite").save("/FileStore/tables/RAW_ZONE_PARQUET/")
# divide o arquivo em 3 (coalesce divide menos)
#df1.coalesce(3).write.format("parquet").mode("overwrite").save("/FileStore/tables/RAW_ZONE_PARQUET/")

In [0]:
## lista o conteúdo no DBFS
display(dbutils.fs.ls("/FileStore/tables/RAW_ZONE_PARQUET"))

path,name,size
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_135354252739127191,_committed_135354252739127191,518
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_1433377813572746985,_committed_1433377813572746985,221
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_1599770651624670084,_committed_1599770651624670084,518
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_1688406661277726661,_committed_1688406661277726661,331
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_259648690944409021,_committed_259648690944409021,1007
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_2943028427495255682,_committed_2943028427495255682,518
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_5653985938469381856,_committed_5653985938469381856,123
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_6801430166467076667,_committed_6801430166467076667,710
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_7488681158009854653,_committed_7488681158009854653,518
dbfs:/FileStore/tables/RAW_ZONE_PARQUET/_committed_8594716186931616239,_committed_8594716186931616239,617


In [0]:
# Path - dataset json
path_ds_json = "/FileStore/tables/country_vaccinations.json"

In [0]:
# Criando um dataframe a partir de um JSON, sem inferir schema
df_json = spark.read.option("multiline","true").json(path_ds_json, primitivesAsString='true')

In [0]:
## Exibindo dataframe
df_json.show(5)
# mostra os nomes das colunas
#df_json.columns
# mostra as colunas com os tipos de dados
#df_json.dtypes

In [0]:
# Path - dataset json
path_ds_parquet = "/FileStore/tables//RAW_ZONE_PARQUET/part-00000-tid-1433377813572746985-0269c953-c981-47c3-b28d-f6a4712615d5-11-1-c000.snappy.parquet"

# Criando um dataframe a partir de um PARQUET
df_parquet = spark.read.parquet(path_ds_parquet, primitivesAsString='true')

# mostra as colunas com os tipos de dados
df_parquet.dtypes

In [0]:
# Ingestão de arquivo PARQUET em RAW ZONE
#df1.write.format("orc").mode("overwrite").save("/FileStore/tables/RAW_ZONE_ORC/")
# divide o arquivo em 6 (repartition divide mais)
df1.repartition(4).write.format("orc").mode("overwrite").save("/FileStore/tables/RAW_ZONE_ORC/")

In [0]:
## lista o conteúdo no DBFS
display(dbutils.fs.ls("/FileStore/tables/RAW_ZONE_ORC"))

path,name,size
dbfs:/FileStore/tables/RAW_ZONE_ORC/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/RAW_ZONE_ORC/_committed_4394697554045526432,_committed_4394697554045526432,408
dbfs:/FileStore/tables/RAW_ZONE_ORC/_committed_5693542854057174514,_committed_5693542854057174514,779
dbfs:/FileStore/tables/RAW_ZONE_ORC/_committed_952202727982133881,_committed_952202727982133881,794
dbfs:/FileStore/tables/RAW_ZONE_ORC/_committed_vacuum8488910793784613109,_committed_vacuum8488910793784613109,96
dbfs:/FileStore/tables/RAW_ZONE_ORC/_started_5693542854057174514,_started_5693542854057174514,0
dbfs:/FileStore/tables/RAW_ZONE_ORC/_started_952202727982133881,_started_952202727982133881,0
dbfs:/FileStore/tables/RAW_ZONE_ORC/part-00000-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-37-1-c000.snappy.orc,part-00000-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-37-1-c000.snappy.orc,48704
dbfs:/FileStore/tables/RAW_ZONE_ORC/part-00001-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-38-1-c000.snappy.orc,part-00001-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-38-1-c000.snappy.orc,47795
dbfs:/FileStore/tables/RAW_ZONE_ORC/part-00002-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-39-1-c000.snappy.orc,part-00002-tid-5693542854057174514-a2c215f7-9b6c-4cda-a5e0-909e626dfff9-39-1-c000.snappy.orc,47914


In [0]:
# Path - dataset ORC
# lê todos os arquivos 'part-*'
path_ds_orc = "/FileStore/tables//RAW_ZONE_ORC/part-*"
# Criando um dataframe a partir de um PARQUET
df_orc = spark.read.orc(path_ds_orc)

# conta e exibe a quantidade de registros em cada dataframe
print("Quantidade de registros 'df_parquet':", df_parquet.count())
print("Quantidade de registros 'df_orc':", df_orc.count())

In [0]:
## Imprimindo tipos de campos

df1.dtypes
#df1.printSchema

In [0]:
# Leitura de um RDD

rdd = sc.textFile(path_rdd)
#rdd.show() = Errado, não é possível exibir um SHOW() de um RDD, somente um Dataframe
# exibe o arquivo lido como uma coleção (array), onde cada linha do arquivo é um item da coleção
rdd.collect()

In [0]:
# lêum rdd como um arquivo CSV (mas perde a posição (espaços em branco))
dfff = spark.read.format("csv").load(path_rdd)
display(dfff)

_c0
00001REFRIGERANTE LATA COCA-COLA
00002CERVEJA LATA BRAHMA
00003ENERGÉTICO GARRAFA 510ml REDBULL
00004CERVEJA GARRAFA 600ml ANTÁTICA
00005REFRIGERANTE PET 2lt COCA-COLA


In [0]:
# cria um objeto com o número de linhas específicadas
# map (chave='valor')
df1.take(2)

In [0]:
# Criando/Registrando uma tabela temporária
# para ser usada como uma tabela em memória
nome_tabela_temporiaria = "tab_temp_vacinacao"
df1.createOrReplaceTempView(nome_tabela_temporiaria)

In [0]:
# Lendo a tabela temporaria opcao 1
spark.read.table(nome_tabela_temporiaria).show(3)

In [0]:
# usando SQL para consulta a tabela temporária
spark.sql("SELECT count(*) tt, country FROM tab_temp_vacinacao Group By country").show()

In [0]:
# cria um dataframe a patir da instrução SQL
dfvac = spark.sql("SELECT country, count(*) quant FROM tab_temp_vacinacao Group By country")
dfvac.show()

In [0]:
# Visualização do Databricks (usando display para exibição de tabela) 
display(spark.sql("SELECT country, iso_code, count(*) quant FROM tab_temp_vacinacao Group By country, iso_code"))

country,iso_code,quant
Slovakia,SVK,54
Indonesia,IDN,46
France,FRA,61
Malta,MLT,40
Zimbabwe,ZWE,9
Australia,AUS,12
Qatar,QAT,59
Macao,MAC,11
Azerbaijan,AZE,21
Algeria,DZA,22


In [0]:
# Visualização do Databricks (usando display para exibição de gráficos) 
display(spark.sql("SELECT country pais, iso_code, max(total_vaccinations) nr_vacinados FROM tab_temp_vacinacao group by pais, iso_code"))

pais,iso_code,nr_vacinados
Slovakia,SVK,433998.0
Indonesia,IDN,2449451.0
France,FRA,4298573.0
Malta,MLT,73644.0
Zimbabwe,ZWE,12579.0
Australia,AUS,23504.0
Qatar,QAT,140000.0
Macao,MAC,2000.0
Azerbaijan,AZE,65000.0
Algeria,DZA,75000.0


In [0]:
# Scala
#import org.apache.spark.sql.functions._

# Python
from pyspark.sql.functions import col, column

# Usando function col ou column
df1.select(col("country"), col("date"), column("iso_code")).show()

In [0]:
df1.selectExpr("country", "date", "iso_code").show()

In [0]:
# Scala import
# org.apache.spark.sql.types._

# Criando um Schema manualmente no PySpark
from pyspark.sql.types import *

dataframe_ficticio = StructType([
                      StructField("col_String_1", StringType()),
                      StructField("col_Integer_2", IntegerType()),
                      StructField("col_Decimal_3", DecimalType())
                              ])
dataframe_ficticio

In [0]:
# Função para gerar Schema (campos/colunas/nomes de colunas)

'''
# Scala

org.apache.spark.sql.types._

def getSchema(fields : Array[StructField]) : StructType = {
  new StructType(fields)
}
'''

# PySpark
def getSchema(fields):
  return StructType(fields)
  
schema = getSchema([StructField("coluna1", StringType()), StructField("coluna2", StringType()), StructField("coluna3", StringType())])

In [0]:
schema

In [0]:
# cria dataframe para gravação de arquivos
dfgrav = spark.sql("SELECT country, iso_code, count(*) quant FROM tab_temp_vacinacao Group By country, iso_code")

In [0]:
# Gravando um novo CSV

path_destino="/FileStore/tables/arquivos_gravados/CSV/"
#nome_arquivo="arquivo.csv"
#path_geral= path_destino + nome_arquivo
dfgrav.write.format("csv").mode("overwrite").option("sep", "\t").save(path_destino)

In [0]:
# Gravando um novo JSON

path_destino="/FileStore/tables/arquivos_gravados/JSON/"
#nome_arquivo="arquivo.json"
#path_geral= path_destino + nome_arquivo
dfgrav.write.format("json").mode("overwrite").save(path_destino)

In [0]:
# Gravando um novo PARQUET

path_destino="/FileStore/tables/arquivos_gravados/PARQUET/"
#nome_arquivo="arquivo.parquet"
#path_geral= path_destino + nome_arquivo
df1.write.format("parquet").mode("overwrite").save(path_destino)

In [0]:
# Gravando um novo ORC

path_destino="/FileStore/tables/arquivos_gravados/ORC/"
#nome_arquivo="arquivo.orc"
#path_geral= path_destino + nome_arquivo
df1.write.format("orc").mode("overwrite").save(path_destino)

In [0]:
## lista o conteúdo no DBFS
display(dbutils.fs.ls("/FileStore/tables/arquivos_gravados/ORC/"))

path,name,size
dbfs:/FileStore/tables/arquivos_gravados/ORC/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/arquivos_gravados/ORC/_committed_7571209665962204664,_committed_7571209665962204664,120
dbfs:/FileStore/tables/arquivos_gravados/ORC/_started_7571209665962204664,_started_7571209665962204664,0
dbfs:/FileStore/tables/arquivos_gravados/ORC/part-00000-tid-7571209665962204664-c4f9c6d5-72aa-49b5-b1d8-ac82b2685d38-204-1-c000.snappy.orc,part-00000-tid-7571209665962204664-c4f9c6d5-72aa-49b5-b1d8-ac82b2685d38-204-1-c000.snappy.orc,119022


In [0]:
# Outros tipos de SELECT

#Diferentes formas de selecionar uma coluna

from pyspark.sql.functions import *

df1.select("country").show(5)
df1.select('country').show(5)
df1.select(col("country")).show(5)
df1.select(column("country")).show(5)
df1.select(expr("country")).show(5)
df1.country

In [0]:
# Define uma nova coluna com um valor constante
df2 = df1.withColumn("nova_coluna", lit(1))
display(df1)
display(df2)

country,iso_code,date,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million,vaccines,source_name,source_website
Albania,ALB,2021-01-10,0.0,0.0,,,,0.0,0.0,,,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-11,,,,,64.0,,,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-12,128.0,128.0,,,64.0,0.0,0.0,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-13,188.0,188.0,,60.0,63.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-14,266.0,266.0,,78.0,66.0,0.01,0.01,,23.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-15,308.0,308.0,,42.0,62.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-16,369.0,369.0,,61.0,62.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-17,405.0,405.0,,36.0,58.0,0.01,0.01,,20.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-18,447.0,447.0,,42.0,55.0,0.02,0.02,,19.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/
Albania,ALB,2021-01-19,483.0,483.0,,36.0,51.0,0.02,0.02,,18.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/


country,iso_code,date,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million,vaccines,source_name,source_website,nova_coluna
Albania,ALB,2021-01-10,0.0,0.0,,,,0.0,0.0,,,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-11,,,,,64.0,,,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-12,128.0,128.0,,,64.0,0.0,0.0,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-13,188.0,188.0,,60.0,63.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-14,266.0,266.0,,78.0,66.0,0.01,0.01,,23.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-15,308.0,308.0,,42.0,62.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-16,369.0,369.0,,61.0,62.0,0.01,0.01,,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-17,405.0,405.0,,36.0,58.0,0.01,0.01,,20.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-18,447.0,447.0,,42.0,55.0,0.02,0.02,,19.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1
Albania,ALB,2021-01-19,483.0,483.0,,36.0,51.0,0.02,0.02,,18.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e-shendetesise-1031-te-vaksinuar-3691-testime-849-te-sheruar-986-raste-te-reja-dhe-21-humbje-jete-ne-24-oret-e-fundit/,1


In [0]:
# Adicionar coluna
teste = expr("total_vaccinations < 40")
#df1.select("country", "total_vaccinations").withColumn("teste", teste).show(5)


# Renomear uma coluna
df1.select(expr("total_vaccinations as total_de_vacinados")).show(5)
df1.select(col("country").alias("pais")).show(5)
df1.select("country").withColumnRenamed("country", "pais").show(5)

# Remover uma coluna
df3 = df1.drop("country")
df3.columns


In [0]:
# Filtrando dados e ordenando
# where() é um alias para filter().

# Seleciona apenas os primeiros registros da coluna "total_vaccinations"
df1.filter(df1.total_vaccinations > 55).orderBy(df1.total_vaccinations).show(2)

# Filtra por país igual Argentina
df1.select(df1.total_vaccinations, df1.country).filter(df1.country == "Argentina").show(5)

# Filtra por país diferente Argentina
df1.select(df1.total_vaccinations, df1.country).where(df1.country != "Argentina").show(5) # python type

# Mostra valores únicos
df1.select("country").distinct().show()

# Especificando vários filtros em comando separados
filtro_vacinas = df1.total_vaccinations < 100
filtro_pais = df1.country.contains("Argentina")
df1.select(df1.total_vaccinations, df1.country, df1.vaccines).where(df1.vaccines.isin("Sputnik V", "Sinovac")).filter(filtro_vacinas).show(5)
df1.select(df1.total_vaccinations, df1.country, df1.vaccines).where(df1.vaccines.isin("Sputnik V", "Sinovac")).filter(filtro_vacinas).withColumn("filtro_pais", filtro_pais).show(5)



In [0]:
"""#######################################################################################################################
Convertendo dados
#######################################################################################################################"""

df5 = df1.withColumn("PAISSSSS", col("country").cast("string").alias("PAISSSSSSS"))
df5.select(df5.PAISSSSS).show(2)

"""#######################################################################################################################
Trabalhando com funções
#######################################################################################################################"""

# Usando funções
df1.select(upper(df1.country)).show(3)
df1.select(lower(df1.country)).show(4)

In [0]:
# Criando um dataframe genérico

d = [{'name': 'Alice', 'age': 1}]
df_A = spark.createDataFrame(d)
df_A.show()

In [0]:
rdd1 = [{"nome": "Marco","idade": 33,"status": 'true'},
{"nome": "Antonio","idade":33,"status": 'true'},
{"nome":"Pereira","idade":33,"status": 'true'},
{"nome":"Helena","idade":30,"status": 'true'},
{"nome":"Fernando","idade":35,"status": 'true'},
{"nome":"Carlos","idade":28,"status": 'true'},
{"nome":"Lisa","idade":26,"status": 'true'},
{"nome":"Candido","idade":75,"status": 'false'},
{"nome":"Vasco","idade":62,"status": 'true'}
]
dff1 = spark.createDataFrame(rdd1)
dff1.show()


rdd2 = [
{"nome":"Marco","PaisOrigem":"Brasil"},
{"nome":"Helena","PaisOrigem":"Brasil"},
{"nome":"Gabriel","PaisOrigem":"Brasil"},
{"nome":"Vasco","PaisOrigem":"Portugal"},
{"nome":"Medhi","PaisOrigem":"Marocco"}]

dff2 = spark.createDataFrame(rdd2)
dff2.show()

'''
join_type = "inner"

+------+-----+------+------+----------+
|  nome|idade|status|  nome|PaisOrigem|
+------+-----+------+------+----------+
| Vasco|   62|  true| Vasco|  Portugal|
| Marco|   33|  true| Marco|    Brasil|
|Helena|   30|  true|Helena|    Brasil|
+------+-----+------+------+----------+
'''

'''val join_type = "left_semi"

+------+-----+------+
|  nome|idade|status|
+------+-----+------+
| Vasco|   62|  true|
| Marco|   33|  true|
|Helena|   30|  true|
+------+-----+------+
'''

'''val join_type = "right_outer"

+------+-----+------+-------+----------+
|  nome|idade|status|   nome|PaisOrigem|
+------+-----+------+-------+----------+
| Vasco|   62|  true|  Vasco|  Portugal|
| Marco|   33|  true|  Marco|    Brasil|
|  null| null|  null|Gabriel|    Brasil|
|Helena|   30|  true| Helena|    Brasil|
|  null| null|  null|  Medhi|   Marocco|
+------+-----+------+-------+----------+
'''

'''val join_type = "left_outer"

+--------+-----+------+------+----------+
|    nome|idade|status|  nome|PaisOrigem|
+--------+-----+------+------+----------+
| Antonio|   33|  true|  null|      null|
|   Vasco|   62|  true| Vasco|  Portugal|
|   Marco|   33|  true| Marco|    Brasil|
| Pereira|   33|  true|  null|      null|
|  Carlos|   28|  true|  null|      null|
|Fernando|   35|  true|  null|      null|
| Candido|   75| false|  null|      null|
|  Helena|   30|  true|Helena|    Brasil|
|    Lisa|   26|  true|  null|      null|
+--------+-----+------+------+----------+
'''

'''join_type = "full_outer"

+--------+-----+------+-------+----------+
|    nome|idade|status|   nome|PaisOrigem|
+--------+-----+------+-------+----------+
| Antonio|   33|  true|   null|      null|
|   Vasco|   62|  true|  Vasco|  Portugal|
|   Marco|   33|  true|  Marco|    Brasil|
| Pereira|   33|  true|   null|      null|
|  Carlos|   28|  true|   null|      null|
|    null| null|  null|Gabriel|    Brasil|
|Fernando|   35|  true|   null|      null|
| Candido|   75| false|   null|      null|
|  Helena|   30|  true| Helena|    Brasil|
|    Lisa|   26|  true|   null|      null|
|    null| null|  null|  Medhi|   Marocco|
+--------+-----+------+-------+----------+
'''

'''join_type = "left_anti"

+--------+-----+------+
|    nome|idade|status|
+--------+-----+------+
| Antonio|   33|  true|
| Pereira|   33|  true|
|  Carlos|   28|  true|
|Fernando|   35|  true|
| Candido|   75| false|
|    Lisa|   26|  true|
+--------+-----+------+
'''

join_type = "inner"
join_condition = dff1.nome == dff2.nome
df3 = dff1.join(dff2, join_condition, join_type)
df3.show()

#df1.groupBy("status").agg(countDistinct(col("idade"))).show()