In [0]:
storage_account = "taxistorag"
container = "datalaketaxi"

root_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/taxi_data/"

display(dbutils.fs.ls(root_path))


path,name,size,modificationTime
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/archive/,archive/,0,1765132667000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/file_history/,file_history/,0,1765210851000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/lookup/,lookup/,0,1765131913000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/yellow_tripdata_2024-08.parquet,yellow_tripdata_2024-08.parquet,51067350,1765211337000


In [0]:
%sql
USE CATALOG hive_metastore;



In [0]:
%sql
-- -- 1. Créer la base Bronze stockée dans ADLS
CREATE DATABASE IF NOT EXISTS bronze
LOCATION 'abfss://datalaketaxi@taxistorag.dfs.core.windows.net/bronze/';

-- 2. Recréer une table Bronze à partir de l’ancienne
-- CREATE TABLE IF NOT EXISTS hive_metastore.bronze.trips_data
-- USING DELTA
-- AS
-- SELECT count(*) FROM taxi.bronze_db.trips_data;
-- SELECT count(*) FROM hive_metastore.bronze.trips_data LIMIT 10;

In [0]:
%sql
use hive_metastore.bronze;
select current_database();
drop table if exists trips_data;
create table trips_data
USING DELTA
AS
SELECT * FROM taxi.bronze_db.trips_data;



num_affected_rows,num_inserted_rows


In [0]:
%python
files = dbutils.fs.ls(root_path)
display(files)


path,name,size,modificationTime
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/archive/,archive/,0,1765132667000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/file_history/,file_history/,0,1765210851000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/lookup/,lookup/,0,1765131913000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/yellow_tripdata_2024-08.parquet,yellow_tripdata_2024-08.parquet,51067350,1765211337000


In [0]:
%python 
# source_folder="dbfs:/FileStore/tables/taxi_nyc_data/bronze_data/"
# filePath= source_folder + "*.parquet"
# # source_file="dbfs:/FileStore/tables/taxi_nyc_data/bronze_data/yellow_tripdata_2024_01.parquet"
# df =spark.read.parquet(filePath)
# display(df)
from pyspark.sql.functions import current_timestamp, col , substring_index
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType,LongType
import datetime

# DEfinir le schéma des données
schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", LongType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", LongType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("Airport_fee", DoubleType(), True)
])
# Dossier sources 
# source_folder = "dbfs:/FileStore/tables/taxi_nyc_data/bronze_data/"
filePath = root_path 
# Lister les fichier parquet
files = [f for f in dbutils.fs.ls(root_path) if f.name.endswith(".parquet")]
if files :
    df = spark.read.schema(schema).parquet(filePath)
    display(df)
    # Ajouter la colonne ingesttime et le nom du fichier
    df= df.withColumn("ingest_time", current_timestamp())\
            .withColumn("file_name", substring_index(col("_metadata.file_path"), "/", -1))
    #Calcule des nombre de ligne 
    added_rows_count= df.count()
    # verifier si la table existe
    if "trips_data" in [t.name for t in spark.catalog.listTables("bronze")]:
        # Calculer le nombre de lignes dans la table
        old_rows_count = spark.table("bronze.trips_data").count()
    else:    
        old_rows_count=0
    print(f"Nombre de lignes dans la table : {old_rows_count}")
    print(f"Nombre de lignes à ingérer : {added_rows_count}")

    #Sauvegarder les données dans la table
    df.write.format("delta").mode("append").option("mergeSchema", True).saveAsTable("trips_data")
    if spark.table("bronze.trips_data").count() != old_rows_count + added_rows_count:
        raise Exception("Controle de l'insertion des données échoué")
    else:
        print("Insertion des données réussie")
    #Archivage
    archive_folder = root_path + "/archive/" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")+ "/"

    files = dbutils.fs.ls(root_path)
    for file in files:
        if file.name.endswith(".parquet"):
            dbutils.fs.mv(file.path, archive_folder+file.name)
    print(f"Fichier archivés dans {archive_folder}")
    # afficher le contenu de dossier archive 
    display(dbutils.fs.ls(archive_folder))
else:
    print("Aucun fichier parquet trouvé dans le dossier source.")
# Lire les données


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
1,2024-08-01T00:21:00Z,2024-08-01T00:36:13Z,1,7.4,1,N,138,80,1,28.9,7.75,0.5,7.65,0.0,1.0,45.8,0.0,1.75
2,2024-08-01T00:20:01Z,2024-08-01T00:41:47Z,1,9.91,1,N,138,239,1,40.8,6.0,0.5,11.55,6.94,1.0,71.04,2.5,1.75
1,2024-08-01T00:17:52Z,2024-08-01T00:41:45Z,0,13.4,1,N,138,88,1,52.0,10.25,0.5,15.0,0.0,1.0,78.75,2.5,1.75
1,2024-08-01T00:49:08Z,2024-08-01T00:55:56Z,0,3.9,1,N,209,137,3,17.0,3.5,0.5,0.0,0.0,1.0,22.0,2.5,0.0
1,2024-08-01T00:38:52Z,2024-08-01T00:42:34Z,1,0.4,1,N,148,144,2,5.1,3.5,0.5,0.0,0.0,1.0,10.1,2.5,0.0
1,2024-08-01T00:57:59Z,2024-08-01T01:03:14Z,1,0.4,1,N,148,144,1,5.8,3.5,0.5,2.15,0.0,1.0,12.95,2.5,0.0
2,2024-08-01T00:15:46Z,2024-08-01T00:29:45Z,2,3.21,1,N,211,233,1,16.3,1.0,0.5,2.13,0.0,1.0,23.43,2.5,0.0
2,2024-08-01T00:32:17Z,2024-08-01T00:45:53Z,1,3.8,1,N,170,239,1,17.7,1.0,0.5,4.54,0.0,1.0,27.24,2.5,0.0
2,2024-08-01T00:48:16Z,2024-08-01T01:09:23Z,1,5.54,1,N,239,148,1,27.5,1.0,0.5,6.5,0.0,1.0,39.0,2.5,0.0
2,2024-07-31T23:30:15Z,2024-07-31T23:40:24Z,1,1.56,1,N,164,229,1,11.4,1.0,0.5,3.0,0.0,1.0,19.4,2.5,0.0


Nombre de lignes dans la table : 20332093
Nombre de lignes à ingérer : 2979183
Insertion des données réussie
Fichier archivés dans abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data//archive/2025-12-08-16-38-11/


path,name,size,modificationTime
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/archive/2025-12-08-16-38-11/yellow_tripdata_2024-08.parquet,yellow_tripdata_2024-08.parquet,51067350,1765211892000


In [0]:
%sql
SELECT * FROM bronze.trips_data LIMIT 10;

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,ingest_time,file_name
1,2024-03-01T00:18:51Z,2024-03-01T00:23:45Z,0,1.3,1,N,142,239,1,8.6,3.5,0.5,2.7,0.0,1.0,16.3,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
1,2024-03-01T00:26:00Z,2024-03-01T00:29:06Z,0,1.1,1,N,238,24,1,7.2,3.5,0.5,3.0,0.0,1.0,15.2,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:09:22Z,2024-03-01T00:15:24Z,1,0.86,1,N,263,75,2,7.9,1.0,0.5,0.0,0.0,1.0,10.4,0.0,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:33:45Z,2024-03-01T00:39:34Z,1,0.82,1,N,164,162,1,7.9,1.0,0.5,1.29,0.0,1.0,14.19,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
1,2024-03-01T00:05:43Z,2024-03-01T00:26:22Z,0,4.9,1,N,263,7,2,25.4,3.5,0.5,0.0,0.0,1.0,30.4,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:50:42Z,2024-03-01T01:10:40Z,1,5.04,1,N,238,159,2,25.4,1.0,0.5,0.0,0.0,1.0,27.9,0.0,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:08:23Z,2024-03-01T00:17:53Z,1,2.15,1,N,161,141,1,12.1,1.0,0.5,5.13,0.0,1.0,22.23,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:24:58Z,2024-03-01T00:30:31Z,1,1.1,1,N,236,237,1,8.6,1.0,0.5,2.04,0.0,1.0,15.64,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
2,2024-03-01T00:49:40Z,2024-03-01T01:01:25Z,1,2.78,1,N,161,114,1,14.9,1.0,0.5,2.0,0.0,1.0,21.9,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet
1,2024-03-01T00:21:43Z,2024-03-01T00:24:44Z,1,0.3,1,N,237,141,2,5.1,3.5,0.5,0.0,0.0,1.0,10.1,2.5,0.0,2025-12-01T23:16:48.570545Z,yellow_tripdata_2024_03.parquet


In [0]:
%sql
SELECT count(*) FROM bronze.trips_data;
    

count(1)
23311276


In [0]:
%sql 
SELECT DISTINCT file_name FROM bronze.trips_data ORDER BY file_name ASC ;

file_name
yellow_tripdata_2024-08.parquet
yellow_tripdata_2024_01.parquet
yellow_tripdata_2024_02.parquet
yellow_tripdata_2024_03.parquet
yellow_tripdata_2024_04.parquet
yellow_tripdata_2024_05.parquet
yellow_tripdata_2024_06.parquet


In [0]:
%sql
DESCRIBE EXTENDED bronze.trips_data;
    

col_name,data_type,comment
VendorID,int,
tpep_pickup_datetime,timestamp,
tpep_dropoff_datetime,timestamp,
passenger_count,bigint,
trip_distance,double,
RatecodeID,bigint,
store_and_fwd_flag,string,
PULocationID,int,
DOLocationID,int,
payment_type,bigint,


In [0]:
display(dbutils.fs.ls(root_path))

path,name,size,modificationTime
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/archive/,archive/,0,1765132667000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/file_history/,file_history/,0,1765210851000
abfss://datalaketaxi@taxistorag.dfs.core.windows.net/taxi_data/lookup/,lookup/,0,1765131913000


In [0]:
%sql
-- DROP TABLE IF EXISTS bronze_db.trips_data;

In [0]:
df_zones =spark.read.option("header", "true").option("inferSchema", "true").csv(root_path+"/lookup/taxi_zone_lookup.csv")
display(df_zones)

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [0]:
spark.catalog.setCurrentDatabase("bronze"); df_zones.write.format("delta").mode("ignore").saveAsTable("taxi_zones")


In [0]:
%sql 
SELECT * FROM bronze.taxi_zones LIMIT 10;


LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [0]:
%sql
select max(tpep_pickup_datetime) from taxi.bronze_db.trips_data;

max(tpep_pickup_datetime)
2026-06-26T23:53:12Z
