# Big Data para Cientista de Dados

In [0]:
# Lendo o arquivo de dados
arquivo = "/FileStore/tables/bronze/2015_summary-1.csv"

In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

flightData2015 = spark\
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
# imprime os datatypes das colunas do dataframe
flightData2015.printSchema()

In [0]:
# imprime o tipo da variável flightData2015
type(flightData2015)

In [0]:
# retorna as primeiras 3 linhas do dataframe em formato de array.
flightData2015.take(5)

In [0]:
# Usando o comando display
display(flightData2015.show(3))

In [0]:
# imprime a quantidade de linhas no dataframe.
flightData2015.count()

In [0]:
# lendo o arquivo previamente com a opção inferSchema desligada
flightData2015 = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv("/FileStore/tables/bronze2/*.csv")

In [0]:
df.show(10)

In [0]:
# imprime a quantidade de linhas do datafrme
df.count()

In [0]:
# Opções de Plots
display(df.head(10))

#Trabalhando com SQL

In [0]:
%sql
DROP TABLE IF EXISTS all_files;

In [0]:
%sql
CREATE TABLE all_files
USING csv
OPTIONS (path "/FileStore/tables/bronze2/*.csv", header "true")

In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT * FROM all_files;

In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT count(*) FROM all_files;

In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT DEST_COUNTRY_NAME
       ,avg(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY DEST_COUNTRY_NAME;

In [0]:
# Create a view or table temporária.
df.createOrReplaceTempView("2015_summary_csv")

In [0]:
%sql
select * from 2015_summary_csv

In [0]:
%sql
-- Query na view 2015_summary_csv com multiplicação.
SELECT DEST_COUNTRY_NAME 
      ,ORIGIN_COUNTRY_NAME
      ,count * 10 as count_multiplicado_por_dez
FROM 2015_summary_csv

In [0]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("count < 2").show(2)

In [0]:
# Usando where (um alias para o metodo filter)
df.where("count < 2").show(2)

In [0]:
%sql
-- filtrando linhas com sql
SELECT * 
FROM 2015_summary_csv
WHERE count < 2
LIMIT 2

In [0]:
# obtendo linhas únicas
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

### Manipulando Dataframes

In [0]:
df.sort("count").show(5)

In [0]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("count desc")).show(10)

In [0]:
# visualizando estatísticas descritivas
df.describe().show()

In [0]:
# iterando sobre todas as linhas do dataframe
for i in df.collect():
  #print (i)
  print(i[0], i[1], i[2] * 2)

In [0]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

In [0]:
%sql
-- Usando SQL..
SELECT DEST_COUNTRY_NAME
      ,lower(DEST_COUNTRY_NAME)
      ,Upper(DEST_COUNTRY_NAME)
FROM 2015_summary_csv

In [0]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

In [0]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

In [0]:
# utilizando SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM 2015_summary_csv
GROUP BY DEST_COUNTRY_NAME
""")

In [0]:
# Utilizando Python
dataFrameWay = df.groupBy("DEST_COUNTRY_NAME").count()

In [0]:
# imprime o plano de execução do código
sqlWay.explain()

In [0]:
# imprime o plano de execução do código
dataFrameWay.explain()

In [0]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")


In [0]:
#imprime as  10 primeiras linhas
display(df.head(10))

In [0]:
# Tipos Boleanos
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5)

In [0]:
# cria a tabela temporária dftrable
df.createOrReplaceTempView("dfTable")

In [0]:
# imprime 10 primeiras linhas
display(df.head(10))

In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo <> 536365").show(5)

In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo = 536365").show(5)

In [0]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1

In [0]:
# aplicando os operadores como filtros
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

In [0]:
%sql
-- Aplicando a mesmo código em SQL
SELECT * 
FROM dfTable 
WHERE StockCode in ("DOT")
AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

In [0]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1


In [0]:
# Combinando filtros e operadores boleanos
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

In [0]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

# Trabalhando com tipos diferentes de arquivos

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# Lendo arquivos csv
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

In [0]:
# leia o arquivo alterando os modos de leitura (failfast, permissive, dropmalformed)
df = spark.read.format("csv")\
.option("mode", "failfast")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047.0,United Kingdom


#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [0]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

In [0]:
# usa o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("InvoiceNo", IntegerType()),
    StructField("StockCode", IntegerType()),
    StructField("Description", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("InvoiceDate", TimestampType()),
    StructField("UnitPrice", DoubleType()),
    StructField("CustomerID", DoubleType()),
    StructField("Country", StringType())
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

In [0]:
# usando o parâmetro schema()
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [0]:
# imprime o schema do dataframe.
df.printSchema()

In [0]:
# imprime 10 primeiras linhas do dataframe.
display(df.collect())

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365.0,,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365.0,71053.0,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365.0,,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,22752.0,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365.0,21730.0,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366.0,22633.0,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366.0,22632.0,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367.0,84879.0,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


### Arquivos JSON

In [0]:
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_summary.json")

In [0]:
df_json.printSchema()

In [0]:
display(df_json.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Escrevendo arquivos
- **append** : Adiciona arquivos de saída na lista de arquivos que já existem na localizaçao.
- **overwrite** : Sobreescreve os arquivos no destino.
- **erroIfExists** : Emite um erro e para se já existir arquivos no destino.
- **ignore** : Se existir o dado no destino náo faz nada.

In [0]:
# escrevendo arquivos csv
df.write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

In [0]:
# observe o arquivo gerado.
file = "/FileStore/tables/bronze/saida_2010_12_01.csv/part-00000-tid-513137111285552141-fa5fcb38-55a1-4a12-ac99-df3fa327627c-83-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)

In [0]:
# imprime as 10 primeiras linhas do dataframe 
df.show(10)

#### Escrevendo dados em paralelo

In [0]:
# reparticionando o dado arquivos csv
# observe o diretório criado
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

### Arquivos Parquet

#####**Convertendo .csv para .parquet**
- Dataset .csv usado https://www.kaggle.com/nhs/general-practice-prescribing-data

In [0]:
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [0]:
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [0]:
df.printSchema()

In [0]:
# conta a quantidade de linhas
df.count()

*Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# escrevendo em formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
%fs
ls /FileStore/tables/bronze/df-parquet-file.parquet

path,name,size
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_1074406448632183027,_committed_1074406448632183027,7399
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_6811788773775010210,_committed_6811788773775010210,3689
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_vacuum5025950197839729268,_committed_vacuum5025950197839729268,96
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_1074406448632183027,_started_1074406448632183027,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-158-1-c000.snappy.parquet,part-00000-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-158-1-c000.snappy.parquet,43227662
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-159-1-c000.snappy.parquet,part-00001-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-159-1-c000.snappy.parquet,43131608
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-160-1-c000.snappy.parquet,part-00002-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-160-1-c000.snappy.parquet,43345958
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-161-1-c000.snappy.parquet,part-00003-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-161-1-c000.snappy.parquet,43186513
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-162-1-c000.snappy.parquet,part-00004-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-162-1-c000.snappy.parquet,43215481


In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

In [0]:
# visualizando o dataframe
display(df_parquet.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
3626,12090,20521,3,8.4,7.82,168
3626,23511,11576,1,32.18,29.81,28
3626,14802,14672,162,141.13,133.93,4760
3626,14590,10011,17,15.01,14.12,532
3626,24483,13726,69,57.57,54.67,2121
3626,7768,22070,155,113.03,109.41,4144
3626,1877,13598,102,68.5,67.4,2370
3626,18110,3990,189,156.66,150.44,5222
3626,14058,2144,23,23.52,22.48,588
3626,4558,5695,32,116.64,109.21,756


In [0]:
# visualizando o tamanho dos arquivos
display(dbutils.fs.ls("/FileStore/tables/bronze/df-parquet-file.parquet"))

path,name,size
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_1074406448632183027,_committed_1074406448632183027,7399
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_6811788773775010210,_committed_6811788773775010210,3689
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_vacuum5025950197839729268,_committed_vacuum5025950197839729268,96
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_1074406448632183027,_started_1074406448632183027,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-158-1-c000.snappy.parquet,part-00000-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-158-1-c000.snappy.parquet,43227662
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-159-1-c000.snappy.parquet,part-00001-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-159-1-c000.snappy.parquet,43131608
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-160-1-c000.snappy.parquet,part-00002-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-160-1-c000.snappy.parquet,43345958
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-161-1-c000.snappy.parquet,part-00003-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-161-1-c000.snappy.parquet,43186513
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-162-1-c000.snappy.parquet,part-00004-tid-1074406448632183027-221322a5-643d-4c10-a0b2-7a38a1bfb2eb-162-1-c000.snappy.parquet,43215481


In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/bronze/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")

In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

sizeInGB
1.302


### Spark + PostgreSQL
- Consultar e escrever em um banco de dados relacional.

In [0]:
# Isso é equivalente a executar uma query como: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/{your_database}?user=stack_user@pgserver-1&password={your_password}&sslmode=require
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
pg_catalog,pg_statistic,azure_superuser,,True,False,False,False
pg_catalog,pg_foreign_table,azure_superuser,,True,False,False,False
pg_catalog,pg_authid,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_user_mapping,azure_superuser,,True,False,False,False
pg_catalog,pg_subscription,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_largeobject,azure_superuser,,True,False,False,False
pg_catalog,pg_type,azure_superuser,,True,False,False,False
pg_catalog,pg_attribute,azure_superuser,,True,False,False,False
pg_catalog,pg_proc,azure_superuser,,True,False,False,False
pg_catalog,pg_class,azure_superuser,,True,False,False,False


In [0]:
# consulta dados da coluna schemaname
pgDF.select("schemaname").distinct().show()

In [0]:
# Especifica uma query diretamente.
# Útil para evitar o "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("query", "select schemaname,tablename from pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

schemaname,tablename
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc
pg_catalog,pg_class


In [0]:
# imprime as 5 linhas do dataframe df
# não se esqueça de recriar esse dataframe.
df.show(5)

In [0]:
# cria a tabela "produtos" a apartir dos dados do dataframe df.
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user")\
.option("password", "Bigdata2021")\
.save()

In [0]:
# cria o dataframe df_produtos a partir da tabela criada.
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime as linhas do dataframe.
display(df_produtos.collect())

schemaname,tablename
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
public,produtos
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc
