In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import requests
import json
import time

In [0]:
spark = SparkSession.builder.appName("API_Bank_Data").getOrCreate()

API_KEY = "FkA7wMzG06boNdftBzLfcWo76Y50sdxH"

per_page = 50
page = 1  
max_pages = 5  
country_codes = ["AD", "DE", "FR", "IT"]  
banks_list = []  


In [0]:
for country_code in country_codes:
    page = 1  
    while page <= max_pages:
        url = f"https://api.apilayer.com/bank_data/all?country={country_code}&per_page={per_page}&page={page}"
        headers = {"apikey": API_KEY}
        
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            bank_data = data.get("data", [])

            if not bank_data:
                print(f" Nenhum dado retornado na página {page} para {country_code}. Encerrando busca.")
                break

            for bank in bank_data:
                banks_list.append(Row(
                    country=bank["swift_data"].get("country"),
                    country_code=bank["swift_data"].get("country_code"),
                    bank_name=bank["swift_data"].get("name"),
                    branch=bank["swift_data"].get("branch"),
                    city=bank["swift_data"].get("city"),
                    zip_code=bank["swift_data"].get("zip"),
                    swift_code=bank["swift_data"].get("swift"),
                    iban=bank["iban_data"].get("iban"),
                    bank_code=bank["iban_data"].get("bank_code"),
                    account_number=bank["iban_data"].get("account_number"),
                    checksum=bank["iban_data"].get("checksum")
                ))

            print(f" Página {page} carregada com sucesso para {country_code}!")
            page += 1
            time.sleep(1)  

        elif response.status_code == 404:
            print(f" Erro 404: Endpoint não encontrado. Verifique a URL.")
            break
        elif response.status_code == 429:
            print(" Taxa limite excedida. Aguardando para continuar...")
            time.sleep(60)
        else:
            print(f" Erro na API: {response.status_code}, {response.text}")
            break

if banks_list:
    df_spark = spark.createDataFrame(banks_list)

    df_spark.show(truncate=False)

    df_spark.write.mode("overwrite").partitionBy("country_code").parquet("/mnt/datalake/bank_data_partitioned")

    print(" Dados salvos com particionamento por country_code!")
else:
    print(" Nenhum dado coletado. Nenhum DataFrame criado.")


 Página 1 carregada com sucesso para AD!
 Nenhum dado retornado na página 2 para AD. Encerrando busca.
 Página 1 carregada com sucesso para DE!
 Página 2 carregada com sucesso para DE!
 Página 3 carregada com sucesso para DE!
 Página 4 carregada com sucesso para DE!
 Página 5 carregada com sucesso para DE!
 Página 1 carregada com sucesso para FR!
 Página 2 carregada com sucesso para FR!
 Página 3 carregada com sucesso para FR!
 Página 4 carregada com sucesso para FR!
 Página 5 carregada com sucesso para FR!
 Página 1 carregada com sucesso para IT!
 Página 2 carregada com sucesso para IT!
 Página 3 carregada com sucesso para IT!
 Página 4 carregada com sucesso para IT!
 Página 5 carregada com sucesso para IT!
+-------+------------+----------------------------------------------------------------------------+-----------------------------------------------------------------------------------+------------------+--------+-----------+------------------------+---------+--------------+--------+