In [0]:
# %sql
# CREATE VOLUME workspace.raw.rawvolume


In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/raw/rawvolume/rawdata")

In [0]:
dbutils.fs.mkdirs("/Volumes/workspace/raw/rawvolume/rawdata/bookings")

In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/raw/rawvolume/rawdata/flights")

In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/raw/rawvolume/rawdata/customers")

In [0]:
# dbutils.fs.mkdirs("/Volumes/workspace/raw/rawvolume/rawdata/airports")

In [0]:
import requests
import base64
import pandas as pd
from io import StringIO

def load_github_csv_spark(filename, folder):
    """
    Fetch a CSV file from GitHub using the API, load it into a Spark DataFrame,
    and save it to a Unity Catalog volume or S3 bucket in append mode.

    Args:
        filename (str): Name of the CSV file in GitHub (e.g., 'dim_flights.csv')
        folder (str): Subfolder in the storage location (e.g., 'flights')
    """

    # -----------------------------
    # Step 1: Build GitHub API URL
    # -----------------------------
    # GitHub API returns JSON metadata, including Base64-encoded file content
    url = f"https://api.github.com/repos/AnkitaChouguleA/AnshLambaYoutube/contents/Databricks%20End%20To%20End%20Project/{filename}"

    # -----------------------------
    # Step 2: Call GitHub API
    # -----------------------------
    response = requests.get(url)
    if response.status_code == 200:

        # -----------------------------
        # Step 3: Decode Base64 content from GitHub
        # -----------------------------
        # GitHub API encodes file content in Base64
        file_content = response.json()['content']          # Get 'content' from JSON
        decoded_content = base64.b64decode(file_content)  # Decode Base64 to bytes
        csv_text = decoded_content.decode('utf-8')        # Convert bytes → string

        # -----------------------------
        # Step 4: Load CSV into pandas DataFrame
        # -----------------------------
        # StringIO lets pandas read the string as a CSV file
        pdf = pd.read_csv(StringIO(csv_text))

        # -----------------------------
        # Step 5: Convert pandas → Spark DataFrame
        # -----------------------------
        # Spark can now work with the data
        df = spark.createDataFrame(pdf)

        # -----------------------------
        # Step 6: Define storage path
        # -----------------------------
        # Unity Catalog volume path or S3 bucket path
        # Example Unity Catalog path:
        save_path = f"/Volumes/workspace/raw/rawvolume/rawdata/{folder}/"

        # -----------------------------
        # Step 7: Save DataFrame in append mode
        # -----------------------------
        # mode("append") → add new files without deleting existing data
        df.write.mode("append").parquet(save_path)

        # -----------------------------
        # Step 8: Print success message + show sample data
        # -----------------------------
        print(f"✅ {filename} appended to {save_path}")
        df.show(5)  # Show first 5 rows for verification

        # -----------------------------
        # Step 9: Return Spark DataFrame
        # -----------------------------
        return df

    else:
        # -----------------------------
        # Step 10: Error handling
        # -----------------------------
        print(f"❌ Error fetching {filename}: {response.status_code}")
        print(response.text)
        return None


In [0]:
# Load Flights dataset
df_flights = load_github_csv_spark("dim_flights.csv", "flights")

# Load Customers dataset
df_customers = load_github_csv_spark("dim_passengers.csv", "customers")

# Load Bookings dataset
df_bookings = load_github_csv_spark("fact_bookings.csv", "bookings")

# Load Airports dataset
df_airports = load_github_csv_spark("dim_airports.csv", "airports")


In [0]:
# Check if flights CSV was written to the Unity Catalog volume
display(dbutils.fs.ls("/Volumes/workspace/raw/rawvolume/rawdata/"))

In [0]:
# Check if flights CSV was written to the Unity Catalog volume
display(dbutils.fs.ls("/Volumes/workspace/raw/rawvolume/rawdata/flights/"))

In [0]:
# %sql
# CREATE SCHEMA workspace.gold

In [0]:
# %sql
# CREATE SCHEMA workspace.silver

In [0]:
# %sql
# CREATE SCHEMA workspace.bronze

In [0]:
# %sql
# CREATE VOLUME workspace.gold.goldvolume

In [0]:
# %sql
# CREATE VOLUME workspace.silver.silvervolume

In [0]:
# %sql
# CREATE VOLUME workspace.bronze.bronzevolume

In [0]:
# Load incremental bookings CSV from GitHub into rawdata/bookings
df_increment = load_github_csv_spark(
    "fact_bookings_increment.csv",  # New incremental CSV
    "bookings"                       # Matches your existing rawdata folder
)


In [0]:
%sql
select * from delta.`/Volumes/workspace/bronze/bronzevolume/flights/data/`

In [0]:
# Load incremental passengers CSV from GitHub into rawdata/customers
df_increment = load_github_csv_spark(
    "dim_passengers_increment.csv",  # New incremental CSV
    "customers"                       # Matches your existing rawdata folder
)


In [0]:
# Load incremental bookings CSV from GitHub into rawdata/airports
df_increment = load_github_csv_spark(
    "dim_airports_increment.csv",  # New incremental CSV
    "airports"                       # Matches your existing rawdata folder
)


In [0]:
# Load incremental bookings CSV from GitHub into rawdata/flights
df_increment = load_github_csv_spark(
    "dim_flights_increment.csv",  # New incremental CSV
    "flights"                       # Matches your existing rawdata folder
)


In [0]:
%sql
select * from delta.`/Volumes/workspace/bronze/bronzevolume/flights/data/`

In [0]:
# Load passengers SCD CSV from GitHub into rawdata/passengers
df_passengers_scd = load_github_csv_spark(
    "dim_passengers_scd.csv",   # Your file on GitHub
    "customers"                # Target subfolder inside rawdata
)