# Extraction des données brut, ajout des informations de geojson et enregistrement en parquet

In [1]:
import pandas as pd

import geopandas as gpd
from shapely.geometry import Point

from sqlalchemy import create_engine, text
from geoalchemy2 import Geometry
import psycopg2

import os
from dotenv import load_dotenv

## 1 - Importation des données, descriptif et adaptation des types

In [None]:
path1= '../data_brut/taxi_trip_2016-01.csv'
path2= '../data_brut/taxi_trip_2016-02.csv'
path3= '../data_brut/taxi_trip_2016-03.csv'

# Load data
data1 = pd.read_csv(path1)
data2 = pd.read_csv(path2)
data3 = pd.read_csv(path3)

In [3]:
# merge des 3 df
data = pd.concat([data1, data2, data3])

In [4]:
del data1, data2, data3

In [5]:
data.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2,2016-01-01 00:00:00,2016-01-01 00:00:00,2,1.1,-73.990372,40.734695,1,N,-73.981842,40.732407,2,7.5,0.5,0.5,0.0,0.0,0.3,8.8
1,2,2016-01-01 00:00:00,2016-01-01 00:00:00,5,4.9,-73.980782,40.729912,1,N,-73.944473,40.716679,1,18.0,0.5,0.5,0.0,0.0,0.3,19.3
2,2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,10.54,-73.98455,40.679565,1,N,-73.950272,40.788925,1,33.0,0.5,0.5,0.0,0.0,0.3,34.3
3,2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,4.75,-73.993469,40.71899,1,N,-73.962242,40.657333,2,16.5,0.0,0.5,0.0,0.0,0.3,17.3
4,2,2016-01-01 00:00:00,2016-01-01 00:00:00,3,1.76,-73.960625,40.78133,1,N,-73.977264,40.758514,2,8.0,0.0,0.5,0.0,0.0,0.3,8.8


In [6]:
data.shape

(34499859, 19)

In [7]:
data['tpep_pickup_datetime'] = pd.to_datetime(data['tpep_pickup_datetime'])
data['tpep_dropoff_datetime'] = pd.to_datetime(data['tpep_dropoff_datetime'])

In [8]:
data['tpep_pickup_datetime'].describe()

count                         34499859
mean     2016-02-16 16:28:47.850749696
min                2016-01-01 00:00:00
25%                2016-01-25 23:54:45
50%                2016-02-17 11:37:35
75%         2016-03-10 00:04:20.500000
max                2016-03-31 23:59:59
Name: tpep_pickup_datetime, dtype: object

In [9]:
data['tpep_dropoff_datetime'].describe()

count                         34499859
mean     2016-02-16 16:44:17.778065152
min                2015-02-07 15:35:25
25%         2016-01-26 00:11:38.500000
50%                2016-02-17 11:54:02
75%                2016-03-10 00:18:53
max                2016-06-29 15:58:16
Name: tpep_dropoff_datetime, dtype: object

In [10]:
data.drop_duplicates(inplace=True)

In [11]:
data.shape


(34499856, 19)

## 2 - AJout des information de quartier et arrondissment de new york depuis le geojson

In [12]:
data_cleaned = data.copy()

In [None]:
# Configuration PostgreSQL

# Charger les variables d'environnement depuis le fichier .env
load_dotenv()

# Récupérer les valeurs des variables
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")

# Connexion PostgreSQL
engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

In [None]:
# Étape 1 : Création de la base de données et activation de PostGIS
def setup_database():
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
        )
        conn.autocommit = True  # Permet d'exécuter des commandes DDL
        cursor = conn.cursor()
        
        # Activer PostGIS
        cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
        cursor.execute("SELECT postgis_version();")  # Vérifier si PostGIS est actif
        
        version = cursor.fetchone()
        print("✅ PostGIS est actif, version :", version[0]) # type: ignore
        
        cursor.close()
        conn.close()
    except Exception as e:
        print("❌ Erreur lors de l'activation de PostGIS :", e)

setup_database()

with engine.connect() as conn:
    result = conn.execute(text("SELECT postgis_version();"))
    postgis_version = result.scalar()
    print("PostGIS est bien activé, version :", postgis_version)

In [None]:
# Étape 2 : Charger le GeoJSON (Zones de NYC)
geojson_path = "data_geojson/custom-pedia-cities-nyc-Mar2018.geojson"
gdf = gpd.read_file(geojson_path)

# Vérifier les colonnes disponibles
print("Colonnes du GeoJSON :", gdf.columns)

In [None]:
# Étape 3 : Importer les zones dans PostgreSQL
gdf.to_postgis("zones_nyc", engine, if_exists="replace", dtype={"geometry": Geometry("POLYGON", srid=4326)}) # type: ignore
print("Données des zones NYC importées dans PostgreSQL !")

In [None]:
# Étape 4 : Chargement et conversion des coordonnées GPS en CSV
csv_path = "data_clean/points_taxis.csv"

data_cleaned['geometry_pickup'] = gpd.points_from_xy(data_cleaned['pickup_longitude'], data_cleaned['pickup_latitude'])
data_cleaned['geometry_dropoff'] = gpd.points_from_xy(data_cleaned['dropoff_longitude'], data_cleaned['dropoff_latitude'])

data_cleaned[['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']].to_csv(csv_path, index=False, encoding='utf-8')

print("Données GPS enregistrées en CSV pour `pickup` et `dropoff` ")


print("Données GPS sauvegardées en CSV pour import rapide ")

In [None]:
# Étape 5 : Suppression des index pour accélérer l'insertion
with engine.begin() as conn:
    conn.execute(text("DROP INDEX IF EXISTS idx_points_geo;"))
    conn.execute(text("DROP INDEX IF EXISTS idx_zones_geo;"))
    print("Index supprimés temporairement pour accélérer l'insertion")

In [None]:
# Étape 6 : Création de la table `points_taxis`
with engine.begin() as conn:
    conn.execute(text("""
        DROP TABLE IF EXISTS points_taxis;
        CREATE TABLE points_taxis (
            id SERIAL PRIMARY KEY,
            pickup_longitude DOUBLE PRECISION,
            pickup_latitude DOUBLE PRECISION,
            dropoff_longitude DOUBLE PRECISION,
            dropoff_latitude DOUBLE PRECISION,
            geometry_pickup GEOMETRY(Point, 4326),
            geometry_dropoff GEOMETRY(Point, 4326)
        );
    """))
    print("Table `points_taxis` créée !")

In [None]:
# Étape 7 : Importer les données GPS via `COPY` (ultra-rapide)
conn = engine.raw_connection()  # Ouvrir la connexion
cursor = conn.cursor()

try:
    with open(csv_path, "r") as f:
        cursor.copy_expert("COPY points_taxis(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude) FROM STDIN WITH CSV HEADER DELIMITER ',';", f)
    conn.commit()
    print("Données des taxis importées via `COPY` 🚀")
finally:
    cursor.close()
    conn.close()


In [None]:
# Étape 8 : Ajouter les colonnes `geometry_pickup` et `geometry_dropoff`
with engine.begin() as conn:
    conn.execute(text("""
        UPDATE points_taxis
        SET geometry_pickup = ST_SetSRID(ST_MakePoint(pickup_longitude, pickup_latitude), 4326),
            geometry_dropoff = ST_SetSRID(ST_MakePoint(dropoff_longitude, dropoff_latitude), 4326);
    """))
    print("Géométries `pickup` et `dropoff` ajoutées !")

In [None]:
#  Étape 9 : Réactivation des index
with engine.begin() as conn:
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_points_geo ON points_taxis USING GIST (geometry_pickup);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_points_dropoff ON points_taxis USING GIST (geometry_dropoff);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_zones_geo ON zones_nyc USING GIST (geometry);"))
print("Index recréés après insertion !")

In [None]:
# Étape 10 : Jointure spatiale optimisée pour `pickup` et `dropoff`
query = """
SELECT  p.*, 
        z_pickup.borough AS pickup_borough, 
        z_pickup.neighborhood AS pickup_neighborhood,
        z_dropoff.borough AS dropoff_borough,
        z_dropoff.neighborhood AS dropoff_neighborhood
FROM points_taxis p
LEFT JOIN zones_nyc z_pickup ON ST_Within(p.geometry_pickup, z_pickup.geometry)
LEFT JOIN zones_nyc z_dropoff ON ST_Within(p.geometry_dropoff, z_dropoff.geometry);
"""

In [None]:
# Étape 11 : Récupérer les résultats et ajouter aux données
result = pd.read_sql(query, engine)
data_cleaned['pickup_borough'] = result['pickup_borough']
data_cleaned['pickup_neighborhood'] = result['pickup_neighborhood']
data_cleaned['dropoff_borough'] = result['dropoff_borough']
data_cleaned['dropoff_neighborhood'] = result['dropoff_neighborhood']

print("Jointure terminée avec `pickup` et `dropoff` !")
print(data_cleaned[['pickup_latitude', 'pickup_longitude', 'pickup_borough', 'dropoff_latitude', 'dropoff_longitude', 'dropoff_borough', 'pickup_neighborhood', 'dropoff_neighborhood']].head())

✅ PostGIS est actif, version : 3.5 USE_GEOS=1 USE_PROJ=1 USE_STATS=1
✅ PostGIS est bien activé, version : 3.5 USE_GEOS=1 USE_PROJ=1 USE_STATS=1
📌 Colonnes du GeoJSON : Index(['neighborhood', 'boroughCode', 'borough', 'X.id', 'geometry'], dtype='object')
✅ Données des zones NYC importées dans PostgreSQL !
✅ Données GPS enregistrées en CSV pour `pickup` et `dropoff` 🚀
✅ Données GPS sauvegardées en CSV pour import rapide 🚀
🚀 Index supprimés temporairement pour accélérer l'insertion
✅ Table `points_taxis` créée !
✅ Données des taxis importées via `COPY` 🚀
✅ Géométries `pickup` et `dropoff` ajoutées !
✅ Index recréés après insertion !
✅ Jointure terminée avec `pickup` et `dropoff` !
   pickup_latitude  pickup_longitude pickup_borough  dropoff_latitude  \
0        40.734695        -73.990372      Manhattan         40.732407   
1        40.729912        -73.980782      Manhattan         40.716679   
2        40.679565        -73.984550       Brooklyn         40.788925   
3        40.718990   

In [16]:
data_cleaned.drop(columns=['geometry_pickup', 'geometry_dropoff'], inplace=True)

## 3 - Enregistrelment en format parquert

In [None]:
data_cleaned.to_parquet('data_clean/taxi_trip_2016-preclean.parquet', index=False)
print("Fichier enregistré en Parquet avec succès !")

✅ Fichier enregistré en Parquet avec succès ! 🚀
