# History Ingest

## Generate a csv in a volume (locally)

In [0]:
import requests

schema: str = "so_schema"
# limit: int = 200
csv_path: str = f"/Volumes/dbx_training/{schema}/raw_files_objets_trouves/data.csv"

raw_url_objets_trouves: str = f"https://ressources.data.sncf.com/api/explore/v2.1/catalog/datasets/objets-trouves-restitution/exports/csv?lang=fr&timezone=Europe%2FBerlin&use_labels=true&delimiter=%3B"

response = requests.get(raw_url_objets_trouves)
response.raise_for_status()

with open(csv_path, "wb") as file:
    file.write(response.content)

### Read CSV, convert to Spark Dataframe, write to unity catalog BRONZE table

In [0]:
# Transform csv file to a spark dataframe
df_bronze = spark.read.option("header","true").option("delimiter",";").csv(csv_path)

# Rename columns to delete " "
df_bronze = (df_bronze
             .withColumnRenamed("Date et heure de restitution", "Date_et_heure_de_restitution")
             .withColumnRenamed("Code UIC", "Code_UIC")
             .withColumnRenamed("Nature d'objets", "Nature_d'objets")
             .withColumnRenamed( "Type d'objets", "Type_d'objets")
             .withColumnRenamed( "Type d'enregistrement", "Type_d'enregistrement"))

# Keep data related 2023
df_bronze = df_bronze.filter(df_bronze["Date"].like("2023-%"))

# Create and store the data in a delta table "bronze_objets_trouves"
df_bronze.write.mode("overwrite").saveAsTable(f"dbx_training.{schema}.bronze_objets_trouves")

# Delete the csv file in the volume
#if dbutils.fs.ls(csv_path):
#     dbutils.fs.rm(csv_path)