In [24]:
import pandas as pd
import warnings
import requests
from datetime import datetime
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
import psutil
import pyarrow.parquet as pq
import io

# Ruta al archivo JSON de las credenciales
key_path = '/Users/negro/Library/CloudStorage/OneDrive-Personal/Documentos/00 Fran/01 - Personales/02-Learn/0. Data Science/0. Data Science/2_projects/1_final/ML/nyc-taxis-and-carbon-emission-3a54f163fa47.json'

# Cargar las credenciales desde el archivo JSON
credentials = service_account.Credentials.from_service_account_file(key_path)

# Crear un cliente de BigQuery utilizando las credenciales cargadas
client = bigquery.Client(credentials=credentials)


In [26]:
# Genera los enlaces de los archivos Parquet para todos los años y meses faltantes desde year_month hasta la fecha actual
enlaces_parquet = []

# Obtener fecha de ejecución
fecha_actual = datetime.now()
fecha_actual_str = fecha_actual.strftime('%Y-%m')

# Obtener fecha más actual de tabla trips
query = '''
    SELECT MAX(time_id) AS max_time_id
    FROM `trip_records_clean.allTrips_2018_2023`
'''
query_job = client.query(query)
result = query_job.result()
row = next(result)

# Obtiene la fecha más actual
max_time_id = row['max_time_id']
fecha_tabla = max_time_id.strftime('%Y-%m')

while fecha_tabla <= fecha_actual_str:
    yellow_tripdata_url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{fecha_tabla}.parquet'
    green_tripdata_url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{fecha_tabla}.parquet'

    enlaces_parquet.append(yellow_tripdata_url)
    enlaces_parquet.append(green_tripdata_url)

    fecha_tabla = (datetime.strptime(fecha_tabla, '%Y-%m') + pd.DateOffset(months=1)).strftime('%Y-%m')



In [None]:
#Verificar contenido archivos parquet
def verificar_parquet_file(parquet_url):
    # Verificar si el enlace contiene un archivo Parquet válido
    try:
        response = requests.head(parquet_url)
        if response.status_code == 200 and response.headers.get('content-type') == 'application/octet-stream':
            return True
        else:
            return False
    except requests.exceptions.RequestException:
        return False

# Lista para almacenar los DataFrames
dfs = []

# Procesar cada enlace de archivo Parquet
for parquet_url in enlaces_parquet:
    # Verificar si el enlace contiene un archivo Parquet
    if verificar_parquet_file(parquet_url):
        # Descargar el archivo Parquet
        response = requests.get('parquet_url')
        parquet_content = response.content
        df = pd.read_parquet(parquet_content, engine='pyarrow')

        # Agregar el DataFrame a la lista
        dfs.append(df)

# Concatenar los DataFrames en uno solo
df_concatenated = pd.concat(dfs, ignore_index=True)

In [27]:
enlaces_parquet

['https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-12.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-12.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-03.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-04.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-04.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-05.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-05.parquet'

In [4]:
response = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')
parquet_content = response.content
df = pd.read_parquet(parquet_content)

In [4]:
response = requests.get('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')
parquet_content = response.content
columnas = ['pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'payment_type', 'pickup_location_id', 'dropoff_location_id', 'total_amount' , 'tip_amount']
df = pd.read_parquet(parquet_content, engine='auto',columns=columnas)

In [12]:
path = '/Users/negro/Downloads/yellow_tripdata_2023-01.parquet'
columnas = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'payment_type', 'PULocationID', 'DOLocationID','payment_type', 'total_amount' , 'tip_amount']

In [14]:
df = pd.read_parquet(path, engine='auto')

In [19]:
import pandas as pd
import numpy as np
import pyarrow.parquet as pq

def read_parquet_with_schema(path, schema):
    # Leer el archivo Parquet con el esquema especificado
    table = pq.read_table(path, columns=list(schema.keys()))

    # Obtener los datos del archivo Parquet según el esquema
    data = {}
    for attr, attr_schema in schema.items():
        column_data = table.column(attr).to_pandas()
        if attr_schema['dtype'] is not None:
            column_data = column_data.astype(attr_schema['dtype'])
            if np.issubdtype(attr_schema['dtype'], np.integer):
                column_data = column_data.replace([np.inf, -np.inf], np.nan).dropna().astype(attr_schema['dtype'])
        data[attr_schema['name']] = column_data

    # Crear el DataFrame con los datos del archivo Parquet y el esquema deseado
    df = pd.DataFrame(data)

    return df

# Ruta del archivo Parquet
path = '/Users/negro/Downloads/yellow_tripdata_2023-01.parquet'

# Esquema deseado para la lectura del archivo Parquet
schema = {
    'tpep_pickup_datetime': {'name': 'pickup_datetime', 'dtype': 'datetime64[ns]'},
    'tpep_dropoff_datetime': {'name': 'dropoff_datetime', 'dtype': 'datetime64[ns]'},
    'passenger_count': {'name': 'passenger_count', 'dtype': float},
    'trip_distance': {'name': 'distance', 'dtype': float},
    'payment_type': {'name': 'payment_type', 'dtype': str},
    'PULocationID': {'name': 'pickup_location', 'dtype': float},
    'DOLocationID': {'name': 'dropoff_location', 'dtype': float},
    'total_amount': {'name': 'total_amount', 'dtype': float},
    'tip_amount': {'name': 'tip_amount', 'dtype': float}
}

# Leer el archivo Parquet con el esquema deseado
df = read_parquet_with_schema(path, schema)



In [20]:
df.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,distance,payment_type,pickup_location,dropoff_location,total_amount,tip_amount
0,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,2,161.0,141.0,14.3,0.0
1,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1,43.0,237.0,16.9,4.0
2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1,48.0,238.0,34.9,15.0
3,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1,138.0,7.0,20.85,0.0
4,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1,107.0,79.0,19.68,3.28


In [25]:
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import storage

def download_parquet_files(links, bucket_name):
    # Crea una instancia del cliente de almacenamiento de Google Cloud
    storage_client = storage.Client()

    # Recorre la lista de enlaces para descargar los archivos Parquet
    for link in links:
        # Obtiene el nombre del archivo Parquet del enlace
        filename = link.split('/')[-1]

        # Descarga el archivo Parquet
        response = requests.get(link)
        parquet_content = response.content

        # Sube el archivo Parquet al bucket en Google Cloud Storage
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(filename)
        blob.upload_from_string(parquet_content, content_type='application/octet-stream')

def read_parquet_files_with_schema(links, schema):
    # Lista para almacenar los DataFrames
    dfs = []

    # Recorre la lista de enlaces y procesa cada archivo Parquet
    for link in links:
        # Obtiene el nombre del archivo Parquet del enlace
        filename = link.split('/')[-1]

        # Lee el archivo Parquet con el esquema especificado
        path = f'gs://web_scraping_trips_0/{filename}'
        table = pq.read_table(path, columns=list(schema.keys()))

        # Obtener los datos del archivo Parquet según el esquema
        data = {}
        for attr, attr_schema in schema.items():
            column_data = table.column(attr).to_pandas()
            if attr_schema['dtype'] is not None:
                column_data = column_data.astype(attr_schema['dtype'])
                if pd.api.types.is_integer_dtype(attr_schema['dtype']):
                    column_data = column_data.replace([np.inf, -np.inf], pd.NA).dropna().astype(attr_schema['dtype'])
            data[attr_schema['name']] = column_data

        # Crea el DataFrame con los datos del archivo Parquet y el esquema deseado
        df = pd.DataFrame(data)

        # Agrega el DataFrame a la lista
        dfs.append(df)

    # Concatena los DataFrames en uno solo
    df_concatenated = pd.concat(dfs, ignore_index=True)

    return df_concatenated


# Lista de enlaces de archivos Parquet
links = [
    'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet',
    'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet',
    'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet'
]

# Esquema deseado para la lectura del archivo Parquet
schema = {
    'tpep_pickup_datetime': {'name': 'pickup_datetime', 'dtype': 'datetime64[ns]'},
    'tpep_dropoff_datetime': {'name': 'dropoff_datetime', 'dtype': 'datetime64[ns]'},
    'passenger_count': {'name': 'passenger_count', 'dtype': float},
    'trip_distance': {'name': 'distance', 'dtype': float},
    'payment_type': {'name': 'payment_type', 'dtype': str},
    'PULocationID': {'name': 'pickup_location', 'dtype': float},
    'DOLocationID': {'name': 'dropoff_location', 'dtype': float},
    'total_amount': {'name': 'total_amount', 'dtype': float},
    'tip_amount': {'name': 'tip_amount', 'dtype': float}
}

# Descarga los archivos Parquet al bucket en Google Cloud Storage
bucket_name = 'web_scraping_trips_0'
download_parquet_files(links, bucket_name)

# Lee los archivos Parquet y crea el DataFrame con el esquema deseado
df = read_parquet_files_with_schema(links, schema)


DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.