Yes, you can configure an S3 bucket for free using the AWS Free Tier. AWS offers a free tier that includes 5 GB of standard storage, 20,000 GET requests, and 2,000 PUT requests per month for the first 12 months.

Here are the steps to create and configure an S3 bucket:

Sign in to the AWS Management Console: Go to the AWS Management Console and sign in with your AWS account.

Create an S3 Bucket:

Navigate to the S3 service.
Click on "Create bucket".
Enter a unique bucket name and select the region.
Configure the bucket settings as needed (you can leave the default settings for now).
Click "Create bucket".
Upload Files to the S3 Bucket:

Click on the bucket name you just created.
Click on "Upload" and follow the instructions to upload your CSV files.
Set Permissions:

Ensure that the bucket has the appropriate permissions for your Databricks environment to access it.
You may need to configure an IAM role with the necessary permissions and attach it to your Databricks cluster.
Access the S3 Bucket from Databricks:

Use the s3a:// protocol to access your S3 bucket in your Databricks notebook.
Here is an example of how to read a CSV file from your S3 bucket in Databricks:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType

# Define the schema for the sales data
salesSchema = StructType([
    StructField("SalesOrderNumber", StringType(), True),
    StructField("SalesOrderLineNumber", IntegerType(), True),
    StructField("OrderDate", DateType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("Tax", FloatType(), True)
])

# Read the CSV files from S3 into a DataFrame
sales = spark.read.format("csv").option("header", "true").schema(salesSchema).load("s3a://your-bucket-name/Databricks-Sales/Dataset/*.csv")

# Display the first 10 rows of the DataFrame
display(sales.limit(10))



To configure an IAM role for S3 access in Databricks, follow these steps:

Create an IAM Role:

Sign in to the AWS Management Console and navigate to the IAM service.
Click on "Roles" and then "Create role".
Select "AWS service" and choose "EC2" as the trusted entity.
Click "Next: Permissions" and attach the necessary policies for S3 access (e.g., AmazonS3FullAccess).
Click "Next: Tags" (optional) and then "Next: Review".
Provide a role name and click "Create role".
Modify the Trust Relationship:

After creating the role, go to the role's "Trust relationships" tab.
Click "Edit trust relationship" and update the policy to allow Databricks to assume the role. Use the following JSON, replacing <databricks-account-id> and <external-id> with your Databricks account ID and external ID:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<databricks-account-id>:role/<databricks-role>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<external-id>"
        }
      }
    }
  ]
}


Example:
Assuming your Databricks account ID is 123456789012 and the role provided by Databricks is databricks-role, the trust relationship JSON would look like this:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:role/databricks-role"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "123456789012"
        }
      }
    }
  ]
}

Add the IAM Role to Databricks:

In the Databricks workspace, go to the "Admin Console" and then "AWS".
Click "Add Instance Profile" and enter the ARN of the IAM role you created.
Click "Add".
Attach the IAM Role to a Cluster:

When creating or editing a cluster, go to the "Advanced Options" and then "Instance Profiles".
Select the IAM role you added and attach it to the cluster.
Access S3 from Databricks:

Use the s3a:// protocol to access your S3 bucket in your Databricks notebook.


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType

salesSchema = StructType([
    StructField("SalesOrderNumber", StringType(), True),
    StructField("SalesOrderLineNumber", IntegerType(), True),
    StructField("OrderDate", DateType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("Email", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("Tax", FloatType(), True)
])

# Read the CSV files from S3 into a DataFrame
sales = spark.read.format("csv").option("header", "true").schema(salesSchema).load("s3a://selvaformabucket/Sales/*.csv")

# Display the first 10 rows of the DataFrame
display(sales.limit(10))

"""
----------------------------------------------------------------------Code in scala
import org.apache.spark.sql.types._

val salesSchema = StructType(List(
     StructField("SalesOrderNumber", StringType),
     StructField("SalesOrderLineNumber", IntegerType),
     StructField("OrderDate", DateType),
     StructField("CustomerName", StringType),
     StructField("Email", StringType),
     StructField("Item", StringType),
     StructField("Quantity", IntegerType),
     StructField("UnitPrice", FloatType),
     StructField("Tax", FloatType)
     ))

val sales = spark.read.format("csv").option("header", "true").schema(salesSchema).load("Databricks-Sales/Dataset/*.csv")
display(sales.head(10)
        """

In [0]:
sales.count()

In [0]:
%sql
--create the bronze table.
--it is a delta table
CREATE OR REPLACE TABLE sales.bronze.sales_bronze
(
    SalesOrderNumber STRING,
    SalesOrderLineNumber INT,
    OrderDate DATE,
    CustomerName STRING,
    Email string,
    Item STRING,
    Quantity INT,
    UnitPrice FLOAT,
    Tax FLOAT
)
USING DELTA

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import expr

# Load the Delta table
deltaTable = DeltaTable.forName(spark, "sales.bronze.sales_bronze")

# Perform the merge operation
deltaTable.alias("sb").merge(
    sales.alias("updates"),
    "sb.SalesOrderNumber = updates.SalesOrderNumber AND sb.OrderDate = updates.OrderDate AND sb.CustomerName = updates.CustomerName AND sb.Item = updates.Item"
).whenMatchedUpdate(
    set={
        "SalesOrderNumber": "updates.SalesOrderNumber",
        "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
        "OrderDate": "updates.OrderDate",
        "CustomerName": "updates.CustomerName",
        "Email": "updates.Email",
        "Item": "updates.Item",
        "Quantity": "updates.Quantity",
        "UnitPrice": "updates.UnitPrice",
        "Tax": "updates.Tax"
    }
).whenNotMatchedInsert(
    values={
        "SalesOrderNumber": "updates.SalesOrderNumber",
        "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
        "OrderDate": "updates.OrderDate",
        "CustomerName": "updates.CustomerName",
        "Email": "updates.Email",
        "Item": "updates.Item",
        "Quantity": "updates.Quantity",
        "UnitPrice": "updates.UnitPrice",
        "Tax": "updates.Tax"
    }
).execute()
"""
Code in scala
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(spark, "Tables/sales_bronze")

deltaTable.as("sb")
.merge(sales.as("updates"),
        "sb.SalesOrderNumber = updates.SalesOrderNumber and sb.OrderDate = updates.OrderDate and sb.CustomerName = updates.CustomerName and sb.Item = updates.Item")
        .whenMatched
        .updateExpr(
            Map(
                "SalesOrderNumber" -> "updates.SalesOrderNumber",
                "SalesOrderLineNumber" -> "updates.SalesOrderLineNumber",
                "OrderDate" -> "updates.OrderDate",
                "CustomerName" -> "updates.CustomerName",
                "Email" -> "updates.Email",
                "Item" -> "updates.Item",
                "Quantity" -> "updates.Quantity",
                "UnitPrice" -> "updates.UnitPrice",
                "Tax" -> "updates.Tax"
            )
        )
        .whenNotMatched
        .insertExpr(
            Map(
                "SalesOrderNumber" -> "updates.SalesOrderNumber",
                "SalesOrderLineNumber" -> "updates.SalesOrderLineNumber",
                "OrderDate" -> "updates.OrderDate",
                "CustomerName" -> "updates.CustomerName",
                "Email" -> "updates.Email",
                "Item" -> "updates.Item",
                "Quantity" -> "updates.Quantity",
                "UnitPrice" -> "updates.UnitPrice",
                "Tax" -> "updates.Tax"
            )
        )
        .execute()"""

In [0]:
%sql
SELECT * FROM sales.bronze.sales_bronze