#1 - Importando Bibliotecas

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_date
import os
from pyspark.sql.types import IntegerType, DoubleType, FloatType, StringType
from pyspark.sql.functions import col, lit, when

#2 - Exibindo ponto de montagem 

In [0]:
display(dbutils.fs.ls("/mnt/data"))

path,name,size,modificationTime
dbfs:/mnt/data/processed/,processed/,0,0
dbfs:/mnt/data/raw/,raw/,0,0
dbfs:/mnt/data/refined/,refined/,0,0


#3 - Leitura dos dados na camada Raw

In [0]:
# Listar os arquivos na pasta 'tabelas'
files = dbutils.fs.ls("/mnt/data/raw/tabelas")

# Iterar sobre cada arquivo e exibir seu conteúdo
for file in files:
    print(f"Lendo o arquivo: {file.name}")  # Exibir o nome do arquivo
    df_csv = spark.read.format("csv").option("header", "true").option("sep", ";").option("inferSchema", "true").load(file.path)  # Carregar o arquivo CSV atual
    
    # Exibir o conteúdo do DataFrame
#    display(df_csv)


Lendo o arquivo: categories.csv
Lendo o arquivo: customer_customer_demo.csv
Lendo o arquivo: customer_demographics.csv
Lendo o arquivo: customers.csv
Lendo o arquivo: employee_territories.csv
Lendo o arquivo: employees.csv
Lendo o arquivo: order_details.csv
Lendo o arquivo: orders.csv
Lendo o arquivo: products.csv
Lendo o arquivo: region.csv
Lendo o arquivo: shippers.csv
Lendo o arquivo: suppliers.csv
Lendo o arquivo: territories.csv
Lendo o arquivo: us_states.csv


#4 - Transformando os arquivos em parquet e salvando na camada Processed

In [0]:
# Caminhos de entrada e saída
input_directory = "/mnt/data/raw/tabelas"
output_directory = "/mnt/data/processed/tabelas_parquet/"

# Listar todos os arquivos na pasta raw/tabelas
files = dbutils.fs.ls(input_directory)

# Processar cada arquivo CSV
for file in files:
    file_name = file.name  # Nome do arquivo
    if file_name.endswith(".csv"):  # Garantir que o arquivo seja CSV
        print(f"Processando arquivo: {file_name}")
        
        # Caminho completo do arquivo na pasta raw
        input_path = f"{input_directory}/{file_name}"
        
        # Nome do arquivo Parquet de saída (adaptado ao nome original do CSV)
        output_file = f"{output_directory}rw_northwind_{file_name.replace('.csv', '.parquet')}"
        
        # Ler o arquivo CSV
        df = spark.read.format("csv").option("header", "true").option("sep", ";").option("inferSchema", "true").load(input_path)
        
        # Reduzir para uma única partição
        df_single_partition = df.coalesce(1)
        
        # Salvar em um diretório temporário
        temp_output_path = f"{output_directory}temp_output_{file_name.replace('.csv', '')}"
        df_single_partition.write.mode("overwrite").parquet(temp_output_path)
        
        # Identificar o arquivo Parquet gerado
        temp_files = dbutils.fs.ls(temp_output_path)
        parquet_file = [file.path for file in temp_files if file.path.endswith(".parquet")][0]
        
        # Renomear o arquivo Parquet para o nome final
        dbutils.fs.mv(parquet_file, output_file)
        
        # Remover o diretório temporário
        dbutils.fs.rm(temp_output_path, recurse=True)
        
        print(f"Arquivo {file_name} salvo como: {output_file}")

print("Processamento concluído para todos os arquivos.")

Processando arquivo: categories.csv
Arquivo categories.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_categories.parquet
Processando arquivo: customer_customer_demo.csv
Arquivo customer_customer_demo.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_customer_customer_demo.parquet
Processando arquivo: customer_demographics.csv
Arquivo customer_demographics.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_customer_demographics.parquet
Processando arquivo: customers.csv
Arquivo customers.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_customers.parquet
Processando arquivo: employee_territories.csv
Arquivo employee_territories.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_employee_territories.parquet
Processando arquivo: employees.csv
Arquivo employees.csv salvo como: /mnt/data/processed/tabelas_parquet/rw_northwind_employees.parquet
Processando arquivo: order_details.csv
Arquivo order_details.csv salvo co

In [0]:
# Lista de arquivos CSV originais processados
csv_files = [
    "customers",
    "orders",
    "products",
    "categories",
    "suppliers",
    "order_details",
    "employees",
    "territories",
    "region",
    "shippers",
    "customer_customer_demo",
    "customer_demographics",
    "employee_territories",
    "us_states"
]

# Loop para processar cada arquivo
for file_name in csv_files:
    print(f"\n\n----- Processando o arquivo: {file_name} -----")
    
    # Caminho do arquivo Parquet na camada processed
    parquet_file = f"/mnt/data/processed/tabelas_parquet/rw_northwind_{file_name}.parquet"
    
    try:
        # Ler o arquivo Parquet
        df_parquet = spark.read.format("parquet").load(parquet_file)
        
        # Exibir o esquema da tabela
        print(f"Esquema do DataFrame para {file_name}:")
        df_parquet.printSchema()
        
        # Exibir os dados de forma tabulada
        print(f"Visualizando os dados de {file_name}:")
        display(df_parquet)
    
    except Exception as e:
        print(f"Erro ao processar o arquivo {file_name}: {e}")




----- Processando o arquivo: customers -----
Esquema do DataFrame para customers:
root
 |-- customer_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)

Visualizando os dados de customers:


customer_id,company_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax
ALFKI,Alfreds Futterkiste,Maria Anders,Sales Representative,Obere Str. 57,Berlin,,12209,Germany,030-0074321,030-0076545
ANATR,Ana Trujillo Emparedados y helados,Ana Trujillo,Owner,Avda. de la Constitución 2222,México D.F.,,05021,Mexico,(5) 555-4729,(5) 555-3745
ANTON,Antonio Moreno Taquería,Antonio Moreno,Owner,Mataderos 2312,México D.F.,,05023,Mexico,(5) 555-3932,
AROUT,Around the Horn,Thomas Hardy,Sales Representative,120 Hanover Sq.,London,,WA1 1DP,UK,(171) 555-7788,(171) 555-6750
BERGS,Berglunds snabbköp,Christina Berglund,Order Administrator,Berguvsvägen 8,Luleå,,S-958 22,Sweden,0921-12 34 65,0921-12 34 67
BLAUS,Blauer See Delikatessen,Hanna Moos,Sales Representative,Forsterstr. 57,Mannheim,,68306,Germany,0621-08460,0621-08924
BLONP,Blondesddsl père et fils,Frédérique Citeaux,Marketing Manager,"24, place Kléber",Strasbourg,,67000,France,88.60.15.31,88.60.15.32
BOLID,Bólido Comidas preparadas,Martín Sommer,Owner,"C/ Araquil, 67",Madrid,,28023,Spain,(91) 555 22 82,(91) 555 91 99
BONAP,Bon app',Laurence Lebihan,Owner,"12, rue des Bouchers",Marseille,,13008,France,91.24.45.40,91.24.45.41
BOTTM,Bottom-Dollar Markets,Elizabeth Lincoln,Accounting Manager,23 Tsawassen Blvd.,Tsawassen,BC,T2F 8M4,Canada,(604) 555-4729,(604) 555-3745




----- Processando o arquivo: orders -----
Esquema do DataFrame para orders:
root
 |-- order_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- ship_via: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- ship_name: string (nullable = true)
 |-- ship_address: string (nullable = true)
 |-- ship_city: string (nullable = true)
 |-- ship_region: string (nullable = true)
 |-- ship_postal_code: string (nullable = true)
 |-- ship_country: string (nullable = true)

Visualizando os dados de orders:


order_id,customer_id,employee_id,order_date,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country
10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France
10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany
10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil
10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France
10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium
10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil
10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland
10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland
10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil
10257,HILAA,4,1996-07-16,1996-08-13,1996-07-22,3,81.91,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela




----- Processando o arquivo: products -----
Esquema do DataFrame para products:
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- quantity_per_unit: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- units_in_stock: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- reorder_level: integer (nullable = true)
 |-- discontinued: integer (nullable = true)

Visualizando os dados de products:


product_id,product_name,supplier_id,category_id,quantity_per_unit,unit_price,units_in_stock,units_on_order,reorder_level,discontinued
1,Chai,8,1,10 boxes x 30 bags,18.0,39,0,10,1
2,Chang,1,1,24 - 12 oz bottles,19.0,17,40,25,1
3,Aniseed Syrup,1,2,12 - 550 ml bottles,10.0,13,70,25,0
4,Chef Anton's Cajun Seasoning,2,2,48 - 6 oz jars,22.0,53,0,0,0
5,Chef Anton's Gumbo Mix,2,2,36 boxes,21.35,0,0,0,1
6,Grandma's Boysenberry Spread,3,2,12 - 8 oz jars,25.0,120,0,25,0
7,Uncle Bob's Organic Dried Pears,3,7,12 - 1 lb pkgs.,30.0,15,0,10,0
8,Northwoods Cranberry Sauce,3,2,12 - 12 oz jars,40.0,6,0,0,0
9,Mishi Kobe Niku,4,6,18 - 500 g pkgs.,97.0,29,0,0,1
10,Ikura,4,8,12 - 200 ml jars,31.0,31,0,0,0




----- Processando o arquivo: categories -----
Esquema do DataFrame para categories:
root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- picture: string (nullable = true)

Visualizando os dados de categories:


category_id,category_name,description,picture
1,Beverages,"Soft drinks, coffees, teas, beers, and ales",\x
2,Condiments,"Sweet and savory sauces, relishes, spreads, and seasonings",\x
3,Confections,"Desserts, candies, and sweet breads",\x
4,Dairy Products,Cheeses,\x
5,Grains/Cereals,"Breads, crackers, pasta, and cereal",\x
6,Meat/Poultry,Prepared meats,\x
7,Produce,Dried fruit and bean curd,\x
8,Seafood,Seaweed and fish,\x




----- Processando o arquivo: suppliers -----
Esquema do DataFrame para suppliers:
root
 |-- supplier_id: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)
 |-- homepage: string (nullable = true)

Visualizando os dados de suppliers:


supplier_id,company_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax,homepage
1,Exotic Liquids,Charlotte Cooper,Purchasing Manager,49 Gilbert St.,London,,EC1 4SD,UK,(171) 555-2222,,
2,New Orleans Cajun Delights,Shelley Burke,Order Administrator,P.O. Box 78934,New Orleans,LA,70117,USA,(100) 555-4822,,#CAJUN.HTM#
3,Grandma Kelly's Homestead,Regina Murphy,Sales Representative,707 Oxford Rd.,Ann Arbor,MI,48104,USA,(313) 555-5735,(313) 555-3349,
4,Tokyo Traders,Yoshi Nagase,Marketing Manager,9-8 Sekimai Musashino-shi,Tokyo,,100,Japan,(03) 3555-5011,,
5,Cooperativa de Quesos 'Las Cabras',Antonio del Valle Saavedra,Export Administrator,Calle del Rosal 4,Oviedo,Asturias,33007,Spain,(98) 598 76 54,,
6,Mayumi's,Mayumi Ohno,Marketing Representative,92 Setsuko Chuo-ku,Osaka,,545,Japan,(06) 431-7877,,Mayumi's (on the World Wide Web)#http://www.microsoft.com/accessdev/sampleapps/mayumi.htm#
7,"Pavlova, Ltd.",Ian Devling,Marketing Manager,74 Rose St. Moonie Ponds,Melbourne,Victoria,3058,Australia,(03) 444-2343,(03) 444-6588,
8,"Specialty Biscuits, Ltd.",Peter Wilson,Sales Representative,29 King's Way,Manchester,,M14 GSD,UK,(161) 555-4448,,
9,PB Knäckebröd AB,Lars Peterson,Sales Agent,Kaloadagatan 13,Göteborg,,S-345 67,Sweden,031-987 65 43,031-987 65 91,
10,Refrescos Americanas LTDA,Carlos Diaz,Marketing Manager,Av. das Americanas 12.890,Sao Paulo,,5442,Brazil,(11) 555 4640,,




----- Processando o arquivo: order_details -----
Esquema do DataFrame para order_details:
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)

Visualizando os dados de order_details:


order_id,product_id,unit_price,quantity,discount
10248,11,14.0,12,0.0
10248,42,9.8,10,0.0
10248,72,34.8,5,0.0
10249,14,18.6,9,0.0
10249,51,42.4,40,0.0
10250,41,7.7,10,0.0
10250,51,42.4,35,0.15
10250,65,16.8,15,0.15
10251,22,16.8,6,0.05
10251,57,15.6,15,0.05




----- Processando o arquivo: employees -----
Esquema do DataFrame para employees:
root
 |-- employee_id: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- title_of_courtesy: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- extension: integer (nullable = true)
 |-- photo: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- reports_to: integer (nullable = true)
 |-- photo_path: string (nullable = true)

Visualizando os dados de employees:


employee_id,last_name,first_name,title,title_of_courtesy,birth_date,hire_date,address,city,region,postal_code,country,home_phone,extension,photo,notes,reports_to,photo_path
1,Davolio,Nancy,Sales Representative,Ms.,1948-12-08,1992-05-01,507 - 20th Ave. E.\nApt. 2A,Seattle,WA,98122,USA,(206) 555-9857,5467,\x,Education includes a BA in psychology from Colorado State University in 1970. She also completed The Art of the Cold Call. Nancy is a member of Toastmasters International.,2.0,http://accweb/emmployees/davolio.bmp
2,Fuller,Andrew,"Vice President, Sales",Dr.,1952-02-19,1992-08-14,908 W. Capital Way,Tacoma,WA,98401,USA,(206) 555-9482,3457,\x,"Andrew received his BTS commercial in 1974 and a Ph.D. in international marketing from the University of Dallas in 1981. He is fluent in French and Italian and reads German. He joined the company as a sales representative, was promoted to sales manager in January 1992 and to vice president of sales in March 1993. Andrew is a member of the Sales Management Roundtable, the Seattle Chamber of Commerce, and the Pacific Rim Importers Association.",,http://accweb/emmployees/fuller.bmp
3,Leverling,Janet,Sales Representative,Ms.,1963-08-30,1992-04-01,722 Moss Bay Blvd.,Kirkland,WA,98033,USA,(206) 555-3412,3355,\x,Janet has a BS degree in chemistry from Boston College (1984). She has also completed a certificate program in food retailing management. Janet was hired as a sales associate in 1991 and promoted to sales representative in February 1992.,2.0,http://accweb/emmployees/leverling.bmp
4,Peacock,Margaret,Sales Representative,Mrs.,1937-09-19,1993-05-03,4110 Old Redmond Rd.,Redmond,WA,98052,USA,(206) 555-8122,5176,\x,Margaret holds a BA in English literature from Concordia College (1958) and an MA from the American Institute of Culinary Arts (1966). She was assigned to the London office temporarily from July through November 1992.,2.0,http://accweb/emmployees/peacock.bmp
5,Buchanan,Steven,Sales Manager,Mr.,1955-03-04,1993-10-17,14 Garrett Hill,London,,SW1 8JR,UK,(71) 555-4848,3453,\x,"Steven Buchanan graduated from St. Andrews University, Scotland, with a BSC degree in 1976. Upon joining the company as a sales representative in 1992, he spent 6 months in an orientation program at the Seattle office and then returned to his permanent post in London. He was promoted to sales manager in March 1993. Mr. Buchanan has completed the courses Successful Telemarketing and International Sales Management. He is fluent in French.",2.0,http://accweb/emmployees/buchanan.bmp
6,Suyama,Michael,Sales Representative,Mr.,1963-07-02,1993-10-17,Coventry House\nMiner Rd.,London,,EC2 7JR,UK,(71) 555-7773,428,\x,"Michael is a graduate of Sussex University (MA, economics, 1983) and the University of California at Los Angeles (MBA, marketing, 1986). He has also taken the courses Multi-Cultural Selling and Time Management for the Sales Professional. He is fluent in Japanese and can read and write French, Portuguese, and Spanish.",5.0,http://accweb/emmployees/davolio.bmp
7,King,Robert,Sales Representative,Mr.,1960-05-29,1994-01-02,Edgeham Hollow\nWinchester Way,London,,RG1 9SP,UK,(71) 555-5598,465,\x,"Robert King served in the Peace Corps and traveled extensively before completing his degree in English at the University of Michigan in 1992, the year he joined the company. After completing a course entitled Selling in Europe, he was transferred to the London office in March 1993.",5.0,http://accweb/emmployees/davolio.bmp
8,Callahan,Laura,Inside Sales Coordinator,Ms.,1958-01-09,1994-03-05,4726 - 11th Ave. N.E.,Seattle,WA,98105,USA,(206) 555-1189,2344,\x,Laura received a BA in psychology from the University of Washington. She has also completed a course in business French. She reads and writes French.,2.0,http://accweb/emmployees/davolio.bmp
9,Dodsworth,Anne,Sales Representative,Ms.,1966-01-27,1994-11-15,7 Houndstooth Rd.,London,,WG2 7LT,UK,(71) 555-4444,452,\x,Anne has a BA degree in English from St. Lawrence College. She is fluent in French and German.,5.0,http://accweb/emmployees/davolio.bmp




----- Processando o arquivo: territories -----
Esquema do DataFrame para territories:
root
 |-- territory_id: integer (nullable = true)
 |-- territory_description: string (nullable = true)
 |-- region_id: integer (nullable = true)

Visualizando os dados de territories:


territory_id,territory_description,region_id
1581,Westboro,1
1730,Bedford,1
1833,Georgetow,1
2116,Boston,1
2139,Cambridge,1
2184,Braintree,1
2903,Providence,1
3049,Hollis,3
3801,Portsmouth,3
6897,Wilton,1




----- Processando o arquivo: region -----
Esquema do DataFrame para region:
root
 |-- region_id: integer (nullable = true)
 |-- region_description: string (nullable = true)

Visualizando os dados de region:


region_id,region_description
1,Eastern
2,Western
3,Northern
4,Southern




----- Processando o arquivo: shippers -----
Esquema do DataFrame para shippers:
root
 |-- shipper_id: integer (nullable = true)
 |-- company_name: string (nullable = true)
 |-- phone: string (nullable = true)

Visualizando os dados de shippers:


shipper_id,company_name,phone
1,Speedy Express,(503) 555-9831
2,United Package,(503) 555-3199
3,Federal Shipping,(503) 555-9931
4,Alliance Shippers,1-800-222-0451
5,UPS,1-800-782-7892
6,DHL,1-800-225-5345




----- Processando o arquivo: customer_customer_demo -----
Esquema do DataFrame para customer_customer_demo:
root
 |-- customer_id: string (nullable = true)
 |-- customer_type_id: string (nullable = true)

Visualizando os dados de customer_customer_demo:


customer_id,customer_type_id




----- Processando o arquivo: customer_demographics -----
Esquema do DataFrame para customer_demographics:
root
 |-- customer_type_id: string (nullable = true)
 |-- customer_desc: string (nullable = true)

Visualizando os dados de customer_demographics:


customer_type_id,customer_desc




----- Processando o arquivo: employee_territories -----
Esquema do DataFrame para employee_territories:
root
 |-- employee_id: integer (nullable = true)
 |-- territory_id: integer (nullable = true)

Visualizando os dados de employee_territories:


employee_id,territory_id
1,6897
1,19713
2,1581
2,1730
2,1833
2,2116
2,2139
2,2184
2,40222
3,30346




----- Processando o arquivo: us_states -----
Esquema do DataFrame para us_states:
root
 |-- state_id: integer (nullable = true)
 |-- state_name: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- state_region: string (nullable = true)

Visualizando os dados de us_states:


state_id,state_name,state_abbr,state_region
1,Alabama,AL,south
2,Alaska,AK,north
3,Arizona,AZ,west
4,Arkansas,AR,south
5,California,CA,west
6,Colorado,CO,west
7,Connecticut,CT,east
8,Delaware,DE,east
9,District of Columbia,DC,east
10,Florida,FL,south


#5 - Validando Dados

In [0]:
from pyspark.sql.types import StructType

# Lista de arquivos na processed
processed_files = [
    "customers", "orders", "products", "categories", "suppliers",
    "order_details", "employees", "territories", "region",
    "shippers", "customer_customer_demo", "customer_demographics",
    "employee_territories", "us_states"
]

# Validação dos esquemas
for file_name in processed_files:
    print(f"\nValidando o esquema para: {file_name}")
    
    # Caminho do arquivo Parquet
    parquet_file = f"/mnt/data/processed/tabelas_parquet/rw_northwind_{file_name}.parquet"
    
    try:
        # Ler o DataFrame
        df = spark.read.format("parquet").load(parquet_file)
        
        # Exibir o esquema
        print(f"Esquema de {file_name}:")
        df.printSchema()
    except Exception as e:
        print(f"Erro ao validar o esquema de {file_name}: {e}")



Validando o esquema para: customers
Esquema de customers:
root
 |-- customer_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)


Validando o esquema para: orders
Esquema de orders:
root
 |-- order_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- ship_via: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- ship_name: string (nullable = true)
 |-- ship_address: string (nullable = true)
 |-- ship_city: 

In [0]:
# Lista de arquivos na processed
processed_files = [
    "customers", "orders", "products", "categories", "suppliers",
    "order_details", "employees", "territories", "region",
    "shippers", "customer_customer_demo", "customer_demographics",
    "employee_territories", "us_states"
]

# Lista para armazenar resultados
resultados_validacao = []

# Loop para processar cada tabela
for file_name in processed_files:
    parquet_file = f"/mnt/data/processed/tabelas_parquet/rw_northwind_{file_name}.parquet"
    print(f"\nValidando: {file_name}")
    
    try:
        # Ler o DataFrame
        df = spark.read.format("parquet").load(parquet_file)
        
        # Verificar nulos
        nulos = {col: df.filter(df[col].isNull()).count() for col in df.columns}
        total_nulos = sum(nulos.values())
        
        # Verificar duplicatas
        duplicatas = df.count() - df.dropDuplicates().count()
        
        # Adicionar resultado
        status = "OK" if total_nulos == 0 and duplicatas == 0 else "Erro"
        resultados_validacao.append((file_name, status, total_nulos, duplicatas))
    
    except Exception as e:
        print(f"Erro ao processar {file_name}: {e}")
        resultados_validacao.append((file_name, "Erro", None, None))

# Criar DataFrame com os resultados
df_resultados = spark.createDataFrame(
    resultados_validacao,
    ["Tabela", "Status", "Nulos", "Duplicatas"]
)

# Salvar resultados em Parquet
output_path = "/mnt/data/processed/validacao/relatorio_validacao.parquet"
df_resultados.write.mode("overwrite").parquet(output_path)

# Exibir resultados
display(df_resultados)



Validando: customers

Validando: orders

Validando: products

Validando: categories

Validando: suppliers

Validando: order_details

Validando: employees

Validando: territories

Validando: region

Validando: shippers

Validando: customer_customer_demo

Validando: customer_demographics

Validando: employee_territories

Validando: us_states


Tabela,Status,Nulos,Duplicatas
customers,Erro,83,0
orders,Erro,547,0
products,OK,0,0
categories,OK,0,0
suppliers,Erro,60,0
order_details,OK,0,0
employees,Erro,5,0
territories,OK,0,0
region,OK,0,0
shippers,OK,0,0


In [0]:
def treat_nulls_by_type_and_save(table_name, table_path, output_path):
    print(f"Tratando valores nulos para a tabela: {table_name}")

    # Ler a tabela Parquet
    df = spark.read.format("parquet").load(table_path)

    # Tratar nulos com base no tipo de dado
    for column, dtype in df.dtypes:
        if dtype in ["int", "bigint", "float", "double"]:  # Colunas numéricas
            df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))
        elif dtype == "string":  # Colunas string
            df = df.withColumn(column, when(col(column).isNull(), lit("")).otherwise(col(column)))
        elif dtype == "date":  # Colunas de data
            df = df.withColumn(column, when(col(column).isNull(), lit("1900-01-01")).otherwise(col(column)))

    # Salvar a tabela tratada na camada processed
    output_file = f"{output_path}/{table_name}_treated.parquet"
    df.write.mode("overwrite").parquet(output_file)
    print(f"Tabela '{table_name}' tratada e salva em: {output_file}")

# Caminho da camada processed
processed_path = "/mnt/data/processed/treated_files"

# Tabelas com valores nulos identificadas
tables_with_nulls = [
    ("customers", "/mnt/data/processed/tabelas_parquet/rw_northwind_customers.parquet"),
    ("orders", "/mnt/data/processed/tabelas_parquet/rw_northwind_orders.parquet"),
    ("suppliers", "/mnt/data/processed/tabelas_parquet/rw_northwind_suppliers.parquet"),
    ("employees", "/mnt/data/processed/tabelas_parquet/rw_northwind_employees.parquet")
]

# Iterar pelas tabelas e tratar nulos
for table_name, table_path in tables_with_nulls:
    treat_nulls_by_type_and_save(table_name, table_path, processed_path)


Tratando valores nulos para a tabela: customers
Tabela 'customers' tratada e salva em: /mnt/data/processed/treated_files/customers_treated.parquet
Tratando valores nulos para a tabela: orders
Tabela 'orders' tratada e salva em: /mnt/data/processed/treated_files/orders_treated.parquet
Tratando valores nulos para a tabela: suppliers
Tabela 'suppliers' tratada e salva em: /mnt/data/processed/treated_files/suppliers_treated.parquet
Tratando valores nulos para a tabela: employees
Tabela 'employees' tratada e salva em: /mnt/data/processed/treated_files/employees_treated.parquet


In [0]:
from pyspark.sql.functions import col, lit, when

# Caminho corrigido das tabelas tratadas
treated_tables = {
    "customers": "/mnt/data/processed/treated_files/customers_treated.parquet",
    "orders": "/mnt/data/processed/treated_files/orders_treated.parquet",
    "suppliers": "/mnt/data/processed/treated_files/suppliers_treated.parquet",
    "employees": "/mnt/data/processed/treated_files/employees_treated.parquet",
}

# Caminho para salvar os arquivos finais
final_output_path = "/mnt/data/processed/final_files"

# Iterar pelas tabelas tratadas
for table_name, table_path in treated_tables.items():
    try:
        print(f"Lendo e salvando tabela: {table_name}")
        
        # Ler o arquivo tratado
        df = spark.read.format("parquet").load(table_path)
        
        # Unir todas as partições em um único arquivo
        df = df.coalesce(1)
        
        # Salvar em uma pasta temporária
        temp_output_path = f"{final_output_path}/temp_{table_name}"
        df.write.mode("overwrite").parquet(temp_output_path)
        
        # Identificar e mover o arquivo Parquet único
        temp_dir = f"/dbfs{temp_output_path}"
        final_file_name = f"{table_name}.parquet"
        final_file_path = f"/dbfs{final_output_path}/{final_file_name}"
        
        # Encontrar o arquivo gerado automaticamente
        for file in os.listdir(temp_dir):
            if file.endswith(".parquet"):
                shutil.move(f"{temp_dir}/{file}", final_file_path)
                break
        
        # Remover a pasta temporária
        shutil.rmtree(temp_dir)
        
        print(f"Tabela {table_name} salva como arquivo único em: {final_file_path}")
    
    except Exception as e:
        print(f"Erro ao processar a tabela {table_name}: {e}")


Lendo e salvando tabela: customers
Erro ao processar a tabela customers: [Errno 22] Invalid argument: '/dbfs/mnt/data/processed/final_files/temp_customers'
Lendo e salvando tabela: orders
Erro ao processar a tabela orders: [Errno 22] Invalid argument: '/dbfs/mnt/data/processed/final_files/temp_orders'
Lendo e salvando tabela: suppliers
Erro ao processar a tabela suppliers: [Errno 22] Invalid argument: '/dbfs/mnt/data/processed/final_files/temp_suppliers'
Lendo e salvando tabela: employees
Erro ao processar a tabela employees: [Errno 22] Invalid argument: '/dbfs/mnt/data/processed/final_files/temp_employees'


In [0]:
import os
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark)

# Caminho do diretório final
final_output_path = "/mnt/data/processed/final_files"

# Mapear os nomes antigos para os novos
file_renames = {
    "rw_customers_final.parquet": "rw_northwind_customers_final.parquet",
    "rw_employees_final.parquet": "rw_northwind_employees_final.parquet",
    "rw_orders_final.parquet": "rw_northwind_orders_final.parquet",
    "rw_suppliers_final.parquet": "rw_northwind_suppliers_final.parquet",
}

# Iterar pelos arquivos para renomear
for old_name, new_name in file_renames.items():
    old_path = f"{final_output_path}/{old_name}"
    new_path = f"{final_output_path}/{new_name}"
    try:
        # Mover (renomear) o arquivo
        dbutils.fs.mv(old_path, new_path)
        print(f"Arquivo {old_name} renomeado para {new_name}")
    except Exception as e:
        print(f"Erro ao renomear {old_name} para {new_name}: {e}")

Erro ao renomear rw_customers_final.parquet para rw_northwind_customers_final.parquet: An error occurred while calling o414.mv.
: java.io.FileNotFoundException: /mnt/data/processed/final_files/rw_customers_final.parquet
	at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$getFileStatus$2(DatabricksFileSystemV2.scala:1199)
	at com.databricks.s3a.S3AExceptionUtils$.convertAWSExceptionToJavaIOException(DatabricksStreamUtils.scala:66)
	at com.databricks.backend.daemon.data.client.DatabricksFileSystemV2.$anonfun$getFileStatus$1(DatabricksFileSystemV2.scala:1185)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:527)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:631)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:651)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.sca

In [0]:
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark)

# Caminhos das pastas
source_path = "/mnt/data/processed/tabelas_parquet"
target_path = "/mnt/data/processed/final_files"

# Mapear os arquivos a serem movidos
files_to_move = [
    "rw_northwind_categories.parquet",
    "rw_northwind_customer_customer_demo.parquet",
    "rw_northwind_customer_demographics.parquet",
    "rw_northwind_employee_territories.parquet",
    "rw_northwind_order_details.parquet",
    "rw_northwind_products.parquet",
    "rw_northwind_region.parquet",
    "rw_northwind_shippers.parquet",
    "rw_northwind_territories.parquet",
    "rw_northwind_us_states.parquet"
]

# Mover e renomear arquivos
for file_name in files_to_move:
    old_path = f"{source_path}/{file_name}"
    new_name = file_name.replace(".parquet", "_final.parquet")
    new_path = f"{target_path}/{new_name}"
    
    try:
        dbutils.fs.mv(old_path, new_path)
        print(f"Arquivo {file_name} movido para {new_path}")
    except Exception as e:
        print(f"Erro ao mover o arquivo {file_name}: {e}")


Arquivo rw_northwind_categories.parquet movido para /mnt/data/processed/final_files/rw_northwind_categories_final.parquet
Arquivo rw_northwind_customer_customer_demo.parquet movido para /mnt/data/processed/final_files/rw_northwind_customer_customer_demo_final.parquet
Arquivo rw_northwind_customer_demographics.parquet movido para /mnt/data/processed/final_files/rw_northwind_customer_demographics_final.parquet
Arquivo rw_northwind_employee_territories.parquet movido para /mnt/data/processed/final_files/rw_northwind_employee_territories_final.parquet
Arquivo rw_northwind_order_details.parquet movido para /mnt/data/processed/final_files/rw_northwind_order_details_final.parquet
Arquivo rw_northwind_products.parquet movido para /mnt/data/processed/final_files/rw_northwind_products_final.parquet
Arquivo rw_northwind_region.parquet movido para /mnt/data/processed/final_files/rw_northwind_region_final.parquet
Arquivo rw_northwind_shippers.parquet movido para /mnt/data/processed/final_files/rw_n