# S3 to Bronze (Managed Delta)

Read directly from S3 and write to managed Delta tables in `development.bronze`.  
S3 = source of truth; Databricks bronze = Delta tables only (no volume copy).

In [None]:
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

bucket = "kbannu-test1"
catalog = "development"
schema = "bronze"


sc = spark.sparkContext
conf = sc._jsc.hadoopConfiguration() #I am using access keys, I can also create a IAM role and assign it to the cluster
conf.set("fs.s3a.access.key", dbutils.secrets.get("quest-aws", "access-key-id"))
conf.set("fs.s3a.secret.key", dbutils.secrets.get("quest-aws", "secret-access-key"))

# 1) JSON from S3 → development.bronze.datausa_population_raw

In [None]:
from pyspark.sql import functions as F

s3_key_json = "datausa/acs_yg_total_population_1.json"
s3_path_json = f"s3a://{bucket}/{s3_key_json}"
raw_json = spark.read.option("multiLine", True).json(s3_path_json)
df_pop = raw_json.select(F.explode("data").alias("row")).select("row.*")
df_pop.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.datausa_population_raw")
print(f"Created {catalog}.{schema}.datausa_population_raw from {s3_path_json}")

# 2) CSV from S3 → development.bronze.bls_pr_data_raw

In [None]:
from pyspark.sql import functions as F

s3_key_csv = "bls/pr/pub/time.series/pr/pr.data.0.Current"
s3_path_csv = f"s3a://{bucket}/{s3_key_csv}"
df_csv_raw = (
    spark.read.option("header", True).option("delimiter", "\t").option("inferSchema", True).csv(s3_path_csv)
)
for c in df_csv_raw.columns:
    df_csv_raw = df_csv_raw.withColumnRenamed(c, c.strip())
df_csv_raw.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.bls_pr_data_raw")
print(f"Created {catalog}.{schema}.bls_pr_data_raw from {s3_path_csv}")