In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a Spark session
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

# Path to the CSV file in DBFS
csv_file_path = "/mnt/bhagi4c/dataset.csv"  # Update this with your actual path

# Read the CSV file into a Spark DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)



In [0]:
# Set your storage account name and access key
storage_account_name = "bhagisa"
storage_account_access_key = "IFKRUhlgpYcphuMYOMsh6IDXzmBdWI0C18s+W+9wbhg6vjEVC8h3x27xnz62oY1QUD6PWK52ivil+AStUu54Pg=="

# Configure access key
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
  storage_account_access_key
)

# Mount Azure Blob Storage container to DBFS
container_name = "bhagi4c"
mount_point = f"/mnt/{container_name}"

# Unmount if already mounted
dbutils.fs.unmount(mount_point)

# Mount the container
dbutils.fs.mount(
  source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
  mount_point = mount_point,
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_access_key}
)

# Path to the CSV file
csv_file_path = f"/mnt/{container_name}/dataset.csv"

# Read the CSV file into a DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)


# Function to get item details from the API
def get_item_details(item_id):
    url = f"https://api.restful-api.dev/objects/{item_id}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to retrieve data for item ID {item_id}. Status code: {response.status_code}")
        return None
# Add item names to the dataset
item_names = []
for item_id in df.select('item_id').rdd.flatMap(lambda x: x).collect():
    item_data = get_item_details(item_id)
    if item_data and 'name' in item_data:
        name = item_data['name']
    else:
        name = "Unknown"
    item_names.append(name)

# Convert the item_names list to a DataFrame
item_names_df = pd.DataFrame(item_names, columns=["item_name"])

# Merge the item names with the original DataFrame
df = df.toPandas()
df["item_name"] = item_names_df

# Convert back to Spark DataFrame if needed
df = spark.createDataFrame(df)

# Display the updated DataFrame
df.show()

# Display the schema
df.printSchema()

# Path to the output CSV file in Azure Blob Storage
output_csv_file_path = f"/mnt/{container_name}/output_dataset"

# Write the DataFrame directly to Azure Blob Storage
df.write.csv(output_csv_file_path, header=True, mode='overwrite')



/mnt/bhagi4c has been unmounted.
+--------+-------------+----------+-------+-----+--------------------+
|order_id|customer_name|order_date|item_id|price|           item_name|
+--------+-------------+----------+-------+-----+--------------------+
|       1|     John Doe|2025-01-01|      3|  100|Apple iPhone 12 P...|
|       1|     John Doe|2025-01-01|      7|  150|Apple MacBook Pro 16|
|       2|   Jane Smith|2025-01-02|      1|   50|  Google Pixel 6 Pro|
|       3|  Bob Johnson|2025-01-03|      5|   75|Samsung Galaxy Z ...|
|       3|  Bob Johnson|2025-01-03|      9|  200|Beats Studio3 Wir...|
|       4|  Emily Davis|2025-01-04|      2|   60|Apple iPhone 12 M...|
|       5|Michael Brown|2025-01-05|      8|  170|Apple Watch Series 8|
|       5|Michael Brown|2025-01-05|      4|   55|Apple iPhone 11, ...|
+--------+-------------+----------+-------+-----+--------------------+

root
 |-- order_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- order_date: date (