## Script de Importación de Trafico de NYC a MySQL

In [None]:
import os
from dotenv import load_dotenv
import pandas as pd
import mysql.connector
from datetime import datetime

# Cargar las variables de entorno desde el archivo .env
load_dotenv()

# Obtener las variables de entorno
db_host = os.getenv('DB_HOST')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_name = os.getenv('DB_NAME')
db_port = os.getenv('DB_PORT')
csv_file_path = 'Automated_Traffic_Volume_Counts.csv'  # Asegúrate de actualizar esto con tu archivo CSV correcto

# Verificar si las variables de entorno se cargaron correctamente
if not all([db_host, db_user, db_password, db_name, db_port]):
    raise ValueError("No se pudieron cargar todas las variables de entorno. Por favor verifica el archivo .env.")

# Cargar el archivo CSV en un DataFrame de pandas
try:
    df = pd.read_csv(csv_file_path)
except FileNotFoundError as e:
    raise FileNotFoundError(f"El archivo {csv_file_path} no se encuentra. Por favor verifica la ruta.")

# Filtrar los datos desde enero 2023 hasta agosto 2024
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 8, 31)

# Ensure the columns are in the correct format
df['Yr'] = df['Yr'].astype(int)
df['M'] = df['M'].astype(int)
df['D'] = df['D'].astype(int)
df['HH'] = df['HH'].astype(int)
df['MM'] = df['MM'].astype(int)

# Create a datetime column
df['datetime'] = pd.to_datetime(df[['Yr', 'M', 'D', 'HH', 'MM']].rename(columns={'Yr': 'year', 'M': 'month', 'D': 'day', 'HH': 'hour', 'MM': 'minute'}))

df_filtered = df[(df['datetime'] >= start_date) & (df['datetime'] <= end_date)].copy()

# Seleccionar solo las columnas necesarias para el análisis
columns_to_keep = ['RequestID', 'Boro', 'Yr', 'M', 'D', 'HH', 'MM', 'Vol', 'SegmentID', 'WktGeom', 'street', 'Direction']
df_filtered = df_filtered[columns_to_keep]

# Conectar a la base de datos MySQL
try:
    db_connection = mysql.connector.connect(
        host=db_host,
        user=db_user,
        password=db_password,
        database=db_name,
        port=int(db_port)
    )
except mysql.connector.Error as err:
    raise mysql.connector.Error(f"Error al conectar a la base de datos: {err}")

cursor = db_connection.cursor()

# Insertar los datos filtrados del DataFrame en bloques de 100,000 filas
chunk_size = 100000
rows_inserted = 0
for start in range(0, len(df_filtered), chunk_size):
    end = start + chunk_size
    chunk = df_filtered.iloc[start:end]

    insert_query = """
    INSERT INTO trafico (RequestID, Boro, Yr, M, D, HH, MM, Vol, SegmentID, WktGeom, Direction)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    data = [(
        row['RequestID'], row['Boro'], row['Yr'], row['M'], row['D'], row['HH'], row['MM'], row['Vol'],
        row['SegmentID'], row['WktGeom'], row['Direction']
    ) for _, row in chunk.iterrows()]

    cursor.executemany(insert_query, data)
    rows_inserted += len(data)
    
    # Imprimir el progreso solo cada 10 bloques
    if start // chunk_size % 10 == 0:
        print(f"Se han insertado {len(data)} filas en este bloque. Total filas insertadas: {rows_inserted}")

# Confirmar los cambios y cerrar la conexión
db_connection.commit()
cursor.close()
db_connection.close()

# Mostrar el número total de filas insertadas
print(f"Se han insertado un total de {rows_inserted} filas en la base de datos.")


## Script de Importación de Temperaturas Promedio de NYC a MySQL


In [2]:
import os
from dotenv import load_dotenv
import pandas as pd
import mysql.connector

# Cargar las variables de entorno desde el archivo .env
load_dotenv()

# Obtener las variables de entorno
db_host = os.getenv('DB_HOST')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_name = os.getenv('DB_NAME')
db_port = os.getenv('DB_PORT')
csv_file_path = 'temperaturas_promedio_nyc_mensual.csv'  # Asegúrate de actualizar esto con tu archivo CSV correcto

# Verificar si las variables de entorno se cargaron correctamente
if not all([db_host, db_user, db_password, db_name, db_port]):
    raise ValueError("No se pudieron cargar todas las variables de entorno. Por favor verifica el archivo .env.")

# Cargar el archivo CSV en un DataFrame de pandas
try:
    df = pd.read_csv(csv_file_path)
except FileNotFoundError as e:
    raise FileNotFoundError(f"El archivo {csv_file_path} no se encuentra. Por favor verifica la ruta.")

# Conectar a la base de datos MySQL
try:
    db_connection = mysql.connector.connect(
        host=db_host,
        user=db_user,
        password=db_password,
        database=db_name,
        port=int(db_port)
    )
except mysql.connector.Error as err:
    raise mysql.connector.Error(f"Error al conectar a la base de datos: {err}")

cursor = db_connection.cursor()

# Insertar los datos del DataFrame en la tabla MySQL
insert_query = """
INSERT INTO temperaturas (Mes, Manhattan, Brooklyn, Queens, The_Bronx, Staten_Island)
VALUES (%s, %s, %s, %s, %s, %s)
"""
data = [tuple(row) for row in df.values]

cursor.executemany(insert_query, data)

# Confirmar los cambios y cerrar la conexión
db_connection.commit()
cursor.close()
db_connection.close()

# Mostrar el número de filas insertadas
print(f"Se han insertado {len(data)} filas en la base de datos.")


Se han insertado 12 filas en la base de datos.


## Script de Importación de Taxi_Zones Promedio de NYC a MySQL


In [None]:
import os
from dotenv import load_dotenv
import pandas as pd
import mysql.connector

# Cargar las variables de entorno desde el archivo .env
print("Cargando las variables de entorno...")
load_dotenv()

# Obtener las variables de entorno
db_host = os.getenv('DB_HOST')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_name = os.getenv('DB_NAME')
db_port = os.getenv('DB_PORT')
csv_file_path = 'taxi_zones.csv'  # Asegúrate de actualizar esto con tu archivo CSV correcto

# Verificar si las variables de entorno se cargaron correctamente
if not all([db_host, db_user, db_password, db_name, db_port]):
    raise ValueError("No se pudieron cargar todas las variables de entorno. Por favor verifica el archivo .env.")
print("Variables de entorno cargadas exitosamente.")

# Cargar el archivo CSV en un DataFrame de pandas
print(f"Cargando el archivo CSV desde {csv_file_path}...")
try:
    df = pd.read_csv(csv_file_path)
except FileNotFoundError as e:
    raise FileNotFoundError(f"El archivo {csv_file_path} no se encuentra. Por favor verifica la ruta.")
print("Archivo CSV cargado exitosamente.")

# Verificar los nombres de las columnas
print("Verificando los nombres de las columnas...")
print(df.columns)

# Conectar a la base de datos MySQL
print("Conectando a la base de datos MySQL...")
try:
    db_connection = mysql.connector.connect(
        host=db_host,
        user=db_user,
        password=db_password,
        database=db_name,
        port=int(db_port)
    )
except mysql.connector.Error as err:
    raise mysql.connector.Error(f"Error al conectar a la base de datos: {err}")
print("Conexión a la base de datos exitosa.")

cursor = db_connection.cursor()

# Insertar los datos del DataFrame en la tabla MySQL
print("Insertando datos en la tabla taxi_zones en la base de datos...")
insert_query = """
INSERT INTO taxi_zones (LocationID, Borough, Zone, service_zone)
VALUES (%s, %s, %s, %s)
"""
data = [(
    row['LocationID'], row['Borough'], row['Zone'], row['service_zone']
) for _, row in df.iterrows()]

cursor.executemany(insert_query, data)

# Confirmar los cambios y cerrar la conexión
print("Confirmando los cambios y cerrando la conexión a la base de datos...")
db_connection.commit()
cursor.close()
db_connection.close()
print("Conexión cerrada.")

# Mostrar el número total de filas insertadas
print(f"Se han insertado un total de {len(data)} filas en la tabla taxi_zones en la base de datos.")


## Script de Importación de Trafico de NYC a MySQL convirtiendo geometrías WKT a objetos Shapely

se requiere libreria geopandas

In [None]:
import pandas as pd
import logging
from sqlalchemy import create_engine, text
from datetime import datetime
from typing import Optional, Dict, Any
import numpy as np

class NYCTaxiTemperatureETL:
    def __init__(self, user: str, password: str, host: str, database: str = 'UrbanTransit'):
        """Initialize ETL process with database credentials."""
        self.connection_string = f'mysql+mysqlconnector://{user}:{password}@{host}/{database}'
        
        # Configure logging
        self.logger = logging.getLogger(__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
        
        # Initialize lookup tables
        self.borough_zones = self._initialize_zone_mappings()

    def create_engine(self):
        """Create and return a database engine with connection pooling."""
        return create_engine(
            self.connection_string,
            pool_recycle=3600,
            pool_pre_ping=True,
            pool_size=5,
            max_overflow=10
        )

    def _initialize_zone_mappings(self) -> Dict[int, str]:
        """Initialize mapping between taxi zones and boroughs."""
        # This would ideally come from a database table, but for now we'll hardcode some examples
        return {
            1: "Manhattan",
            2: "Brooklyn",
            3: "Queens",
            4: "The_Bronx",
            5: "Staten_Island"
        }

    def _get_borough_from_location(self, location_id: int) -> str:
        """Convert location ID to borough name."""
        return self.borough_zones.get(location_id, "Unknown")

    def process_taxi_chunk(self, chunk: pd.DataFrame) -> pd.DataFrame:
        """Process a chunk of taxi data."""
        # Convert datetime columns
        for col in ['Pickup_datetime', 'DropOff_datetime']:
            if col in chunk.columns:
                chunk[col] = pd.to_datetime(chunk[col])
        
        # Extract datetime components for joining with temperature data
        chunk['pickup_year'] = chunk['Pickup_datetime'].dt.year
        chunk['pickup_month'] = chunk['Pickup_datetime'].dt.month
        chunk['pickup_day'] = chunk['Pickup_datetime'].dt.day
        chunk['pickup_hour'] = chunk['Pickup_datetime'].dt.hour
        chunk['pickup_minute'] = chunk['Pickup_datetime'].dt.minute
        
        # Get borough information
        chunk['pickup_borough'] = chunk['PULocationID'].map(self._get_borough_from_location)
        
        return chunk

    def get_temperature_data(self, start_date: str, end_date: str) -> pd.DataFrame:
        """Get temperature data for the specified date range."""
        query = """
        SELECT 
            Mes,
            Manhattan, Brooklyn, Queens, The_Bronx, Staten_Island
        FROM temperaturas
        WHERE DATE_FORMAT(STR_TO_DATE(CONCAT('2024-', Mes, '-01'), '%Y-%m-%d'), '%Y-%m') BETWEEN DATE_FORMAT(:start_date, '%Y-%m') AND DATE_FORMAT(:end_date, '%Y-%m')
        """
        
        try:
            engine = self.create_engine()
            with engine.connect() as conn:
                temperature_data = pd.read_sql_query(
                    text(query),
                    con=conn,
                    params={'start_date': start_date, 'end_date': end_date}
                )
            return temperature_data
        except Exception as e:
            self.logger.error(f"Error fetching temperature data: {str(e)}")
            raise
        finally:
            engine.dispose()

    def enrich_with_temperature(self, taxi_chunk: pd.DataFrame, temperature_data: pd.DataFrame) -> pd.DataFrame:
        """Enrich taxi data with temperature information."""
        # Create month key for joining
        taxi_chunk['pickup_month'] = taxi_chunk['Pickup_datetime'].dt.month
        
        # Merge taxi data with temperature data
        merged = pd.merge(
            taxi_chunk,
            temperature_data,
            left_on='pickup_month',
            right_on='Mes',
            how='left'
        )
        
        # Get temperature based on pickup borough
        for borough in ['Manhattan', 'Brooklyn', 'Queens', 'The_Bronx', 'Staten_Island']:
            merged[borough + '_temperature'] = merged.apply(
                lambda row: row[borough] if row['pickup_borough'] == borough else None,
                axis=1
            )
        
        # Select relevant columns
        merged['temperature'] = merged[['Manhattan_temperature', 'Brooklyn_temperature', 'Queens_temperature', 'The_Bronx_temperature', 'Staten_Island_temperature']].bfill(axis=1).iloc[:, 0]
        
        return merged

    def save_enriched_data(self, enriched_data: pd.DataFrame) -> None:
        """Save enriched data to the database."""
        insert_query = """
        INSERT INTO enriched_taxi_data (
            Pickup_datetime, DropOff_datetime, 
            PULocationID, DOLocationID,
            pickup_borough, temperature,
            pickup_year, pickup_month, pickup_day, pickup_hour, pickup_minute
        ) VALUES (
            %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
        )
        """
        
        try:
            engine = self.create_engine()
            with engine.connect() as conn:
                for _, row in enriched_data.iterrows():
                    conn.execute(text(insert_query), (
                        row['Pickup_datetime'],
                        row['DropOff_datetime'],
                        row['PULocationID'],
                        row['DOLocationID'],
                        row['pickup_borough'],
                        row['temperature'],
                        row['pickup_year'],
                        row['pickup_month'],
                        row['pickup_day'],
                        row['pickup_hour'],
                        row['pickup_minute']
                    ))
                conn.commit()
        except Exception as e:
            self.logger.error(f"Error saving enriched data: {str(e)}")
            raise
        finally:
            engine.dispose()

    def run_etl_process(self, start_date: str, end_date: str, chunksize: int = 100000) -> None:
        """Run the complete ETL process."""
        self.logger.info(f"Starting ETL process for dates {start_date} to {end_date}")
        engine = None

        try:
            # Get temperature data for the entire period
            temperature_data = self.get_temperature_data(start_date, end_date)
            self.logger.info(f"Retrieved {len(temperature_data)} temperature records")
            
            # Process taxi data in chunks
            offset = 0
            total_inserted_rows = 0
            
            while True:
                # Get taxi data chunk
                query = f"""
                SELECT * FROM taxi_fhv_data 
                WHERE Pickup_datetime BETWEEN :start_date AND :end_date
                LIMIT {chunksize} OFFSET {offset}
                """
                
                engine = self.create_engine()
                with engine.connect() as conn:
                    taxi_chunk = pd.read_sql_query(
                        text(query),
                        con=conn,
                        params={'start_date': start_date, 'end_date': end_date}
                    )
                
                if taxi_chunk.empty:
                    self.logger.info("No more taxi data to process")
                    break
                
                self.logger.info(f"Processing chunk of {len(taxi_chunk)} records")
                
                # Process the chunk
                processed_chunk = self.process_taxi_chunk(taxi_chunk)
                enriched_chunk = self.enrich_with_temperature(processed_chunk, temperature_data)
                
                # Save the enriched data
                self.save_enriched_data(enriched_chunk)
                
                inserted_rows = len(enriched_chunk)
                total_inserted_rows += inserted_rows
                
                self.logger.info(f"Completed processing chunk, offset: {offset}")
                offset += chunksize
            
            self.logger.info(f"Total rows inserted: {total_inserted_rows}")
            
        except Exception as e:
            self.logger.error(f"Error in ETL process: {str(e)}")
            raise
        finally:
            if engine:
                engine.dispose()

if __name__ == "__main__":
    # Initialize ETL process
    etl = NYCTaxiTemperatureETL(
        user='usuario_admin',
        password='AWSSIEMPRE__',
        host='urbantransit-db2.cfou8mqoatn0.us-east-2.rds.amazonaws.com'
    )
    
    # Run ETL process for a specific date range
    etl.run_etl_process(
        start_date='2024-01-01',
        end_date='2024-01-31',
        chunksize=100000
    )


2024-11-18 21:54:32,608 - INFO - Starting ETL process for dates 2024-01-01 to 2024-01-31
2024-11-18 21:54:36,880 - INFO - Retrieved 0 temperature records
