# Reading and Writing Back to an S3 Bucket

This image shows the originals and the files that were written back using the code in this notebook.

![](images/S3-direct-read-write.png)

## Directly Connecting to a CSV in an S3 bucket

In [0]:
# Reading

df = spark.read.format("csv").option("header", True).load("s3a://jgarnett-us-east-1-bucket/s3_direct/MNQ_Stock.csv")
display(df.limit(5))

Trade #,Type,Date/Time,Signal,Price USD,Position size (qty),Position size (value),Net P&L USD,Net P&L %,Run-up USD,Run-up %,Drawdown USD,Drawdown %,Cumulative P&L USD,Cumulative P&L %
1,Exit long,2025-08-31 19:13,Short,23535.75,5,117553.75,246.25,0.1,350.0,0.15,-5.0,0.0,246.25,0.49
1,Entry long,2025-08-31 18:39,Long,23510.75,5,117553.75,246.25,0.1,350.0,0.15,-5.0,0.0,246.25,0.49
2,Exit short,2025-08-31 19:46,Long,23535.5,5,117678.75,0.0,0.0,66.25,0.03,-66.25,-0.03,246.25,0.49
2,Entry short,2025-08-31 19:13,Short,23535.75,5,117678.75,0.0,0.0,66.25,0.03,-66.25,-0.03,246.25,0.49
3,Exit long,2025-08-31 19:49,Short,23533.75,5,117677.5,-20.0,-0.01,21.25,0.01,-68.75,-0.03,226.25,0.45


In [0]:
# Write back to the S3 bucket
prefix = "renamed_and_saved_"
new_cols = [f'{prefix}({c})' for c in df.columns]

df_renamed = df.toDF(*new_cols)  # rename all columns in one step


tmp_path = "dbfs:/tmp/MNQ_Stock_single_tmp"
final_path = "s3a://jgarnett-us-east-1-bucket/s3_direct/MNQ_Stock_Renamed_and_Saved.csv"

#1 Writing as a single part folder
df_renamed.write.mode("overwrite").option('header', 'true').csv(tmp_path)

#2 Find single part file
files = dbutils.fs.ls(tmp_path)
part_file = [f.path for f in files if f.name.startswith("part-")][0]

#3 Copy to final S3 file and clean up

dbutils.fs.cp(part_file, final_path)
dbutils.fs.rm(tmp_path, recurse=True)


True

## Directly Connecting to a JSON File in an S3 Bucket

In [0]:
# Reading from a json in S3

df2 = spark.read.format("json").option("multiLine", True).load("s3a://jgarnett-us-east-1-bucket/s3_direct/zoneinfo_data.json")
display(df2.select('metadata').limit(2))

metadata
List(2020a)


In [0]:
from pyspark.sql.functions import col

flat_cols = []
for c in df2.columns:
    # For nested fields like "parent.child", Spark shows them as "parent.child" in schema.
    # Select with alias to replace dots in the output name.
    flat_cols.append(col(c).alias(c.replace(".", "_")))

df2_flat = df2.select(*flat_cols)

# Rename all columns with a prefix
prefix = "renamed_and_saved_"
new_cols = [f'{prefix}({c})' for c in df2_flat.columns]
df2_renamed = df2_flat.toDF(*new_cols)  # rename all columns at once

In [0]:
display(df2_renamed.select('renamed_and_saved_(metadata)'))

renamed_and_saved_(metadata)
List(2020a)


In [0]:
# Writing to a json in S3
tmp_path2 = "dbfs:/tmp/my_json_single_tmp"
final_path = "s3a://jgarnett-us-east-1-bucket/s3_direct/renamed_zone_info.json"

# 1) Write to a temporary folder as a single-part JSON dataset
(
    df2_renamed
      .coalesce(1)              # force single output file
      .write
      .mode("overwrite")
      .json(tmp_path2)
)

# 2) Find the single part file
files = dbutils.fs.ls(tmp_path2)
part_file = [f.path for f in files if f.name.startswith("part-")][0]

# 3) Copy to final S3 key and clean up
dbutils.fs.cp(part_file, final_path)
dbutils.fs.rm(tmp_path2, recurse=True)



True