## Importando as bibliotecas e criando a spark session com a configuração do Minio para escrever os dados da ingestão

In [6]:
!pip install minio



In [7]:
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from IPython.core.display import HTML
import os
import requests
import re
from bs4 import BeautifulSoup
from minio import Minio
from urllib.parse import urlparse

display(HTML("<style>pre { white-space: pre !important; }</style>"))

spark = (SparkSession.builder
         .config("spark.jars","""/home/jovyan/jars/aws-java-sdk-core-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-dynamodb-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-s3-1.11.534.jar,
                                 /home/jovyan/jars/hadoop-aws-3.2.2.jar,
                                 /home/jovyan/jars/postgresql-42.3.3.jar""")
         .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
         .config("spark.hadoop.fs.s3a.access.key", "ZiVYnHFvBt2HumCAXmQG")
         .config("spark.hadoop.fs.s3a.secret.key", "oVNgBEZ5PISLiwF25fJkoazHGXRFB7mJeiBcVygd")
         .config("spark.hadoop.fs.s3a.path.style.access", True)
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
         .getOrCreate()
        )

## Baixando as URL's da página de arquivos dos taxis de NY

In [8]:
# URL da página onde os arquivos Parquet estão disponíveis
url = 'https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page'

# Fazer a requisição HTTP para a página
response = requests.get(url)

# Criar o BeautifulSoup para analisar o conteúdo HTML
soup = BeautifulSoup(response.content, 'html.parser')

# Encontrar todos os links na página
links = soup.find_all('a', href=True)

# Filtrar as URLs que terminam com ".parquet" - alguns arquivos foram salvos com espaço no final da string da url.
parquet_urls = [link['href'] for link in links if link['href'].endswith('.parquet') or link['href'].endswith('.parquet ')]

#Removendo os espaços extras nos objetos do array
parquet_urls = [url.strip() for url in parquet_urls]


# Função para filtrar as URLs
def filter_parquet_urls(urls):
    # Regex para encontrar o nome 'yellow_tripdata' e datas de janeiro a maio de 2023
    filtered_urls = [
        url for url in urls
        if "yellow_tripdata" in url and re.search(r'2023-(0[1-5])\.parquet$', url)
    ]
    return filtered_urls

# Aplicar a filtragem
filtered_urls = filter_parquet_urls(parquet_urls)

# Exibir as URLs filtradas
for url in filtered_urls:
    print(url)


https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-04.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-05.parquet


## Salvando os dados parquet no Minio na camada bronze

In [9]:
from minio import Minio

try:
    minio_client = Minio(
        "172.18.0.5:9000",
        access_key="ZiVYnHFvBt2HumCAXmQG",
        secret_key="oVNgBEZ5PISLiwF25fJkoazHGXRFB7mJeiBcVygd",
        secure=False
    )
    buckets = minio_client.list_buckets()
    for bucket in buckets:
        print(f"Bucket: {bucket.name} criado em {bucket.creation_date}")
except Exception as e:
    print(f"Erro ao conectar ao MinIO: {e}")


Bucket: bronze criado em 2024-11-15 22:59:09.254000+00:00
Bucket: gold criado em 2024-11-15 22:59:22.171000+00:00
Bucket: silver criado em 2024-11-15 22:59:16.161000+00:00


In [10]:
bucket_name = "bronze"

# Verifica se o bucket existe
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)
    print(f"Bucket '{bucket_name}' criado com sucesso.")
else:
    print(f"Bucket '{bucket_name}' já existe.")


Bucket 'bronze' já existe.


In [11]:
# Configuração do MinIO (endpoint local)
minio_endpoint = "http://172.18.0.4:9000"
bucket_name = "bronze"

# Verifica se o bucket existe, se não, cria
minio_client = Minio(
    "172.18.0.5:9000",  # MinIO endpoint
    access_key="ZiVYnHFvBt2HumCAXmQG",  # Access key
    secret_key="oVNgBEZ5PISLiwF25fJkoazHGXRFB7mJeiBcVygd",  # Secret key
    secure=False  # Sem HTTPS
)

# if not minio_client.bucket_exists(bucket_name):
#     minio_client.make_bucket(bucket_name)

# Baixar os arquivos e salvar no MinIO
for parquet_url in filtered_urls:
    # Nome do arquivo local
    file_name = os.path.basename(parquet_url)
    local_path = f'/tmp/{file_name}'

    # Baixar o arquivo Parquet
    response = requests.get(parquet_url)
    with open(local_path, 'wb') as f:
        f.write(response.content)

    print(f'{file_name} baixado com sucesso.')

    # Definir o diretório no bucket (yellow_taxi_files)
    object_name = f"yellow_taxi_files/{file_name}"

    # Subir para o MinIO dentro do diretório yellow_taxi_files
    with open(local_path, 'rb') as f:
        minio_client.put_object(bucket_name, object_name, f, os.stat(local_path).st_size)

    print(f'{file_name} salvo no MinIO em {bucket_name}/{object_name}')

    # Remover o arquivo local temporário
    os.remove(local_path)


yellow_tripdata_2023-01.parquet baixado com sucesso.
yellow_tripdata_2023-01.parquet salvo no MinIO em bronze/yellow_taxi_files/yellow_tripdata_2023-01.parquet
yellow_tripdata_2023-02.parquet baixado com sucesso.
yellow_tripdata_2023-02.parquet salvo no MinIO em bronze/yellow_taxi_files/yellow_tripdata_2023-02.parquet
yellow_tripdata_2023-03.parquet baixado com sucesso.
yellow_tripdata_2023-03.parquet salvo no MinIO em bronze/yellow_taxi_files/yellow_tripdata_2023-03.parquet
yellow_tripdata_2023-04.parquet baixado com sucesso.
yellow_tripdata_2023-04.parquet salvo no MinIO em bronze/yellow_taxi_files/yellow_tripdata_2023-04.parquet
yellow_tripdata_2023-05.parquet baixado com sucesso.
yellow_tripdata_2023-05.parquet salvo no MinIO em bronze/yellow_taxi_files/yellow_tripdata_2023-05.parquet
