# Phase 1: Setup

In [0]:
%pip install requests pandas pyspark
# in case modules are not intalled

In [0]:
import requests
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.sql.functions import col, from_unixtime, to_date

# 1. Define the API and Coins
# Coingecko allows historical fetch without api key
coins = ["bitcoin", "ethereum", "solana"]
base_url = "https://api.coingecko.com/api/v3/coins/{}/market_chart?vs_currency=usd&days={}"
days = "365"

all_data = []

# 2. Loop through coins and fetch data
print("Starting Ingestion")
for coin in coins:
    url = base_url.format(coin, days)
    try:
        response = requests.get(url)
        data = response.json()

        # The API returns a list of [timestamp (ms), price], we need to parse that
        if 'prices' in data:
            for entry in data['prices']:
                # entry[0] is timestamp, entry[1] is price
                all_data.append({
                    "coin_id": coin,
                    "timestamp_ms": entry[0],
                    "price_usd": entry[1],
                    "source": "coingecko_api"
                })
            print(f"Success {coin}")
        else:
            print(f"Error fetching {coin}: {data}")
    except Exception as e:
        print(f"Failed to fetch {coin}: {e}")

# 3. Convert to Pandas first (easier for JSON lists), then Spark
pdf = pd.DataFrame(all_data)

## Define schema explicitly to avoid data type issues
schema = StructType([
    StructField("coin_id", StringType(), True),
    StructField("timestamp_ms", LongType(), True),
    StructField("price_usd", DoubleType(), True),
    StructField("source", StringType(), True)
])

## Create Spark DataFrame
raw_df = spark.createDataFrame(pdf, schema=schema)

# 4. Add human-redeable dates (Transformation)
bronze_df = raw_df \
    .withColumn("event_time", (col("timestamp_ms") / 1000).cast(TimestampType())) \
    .withColumn("date", to_date(col("event_time")))

# 5. Write to Delta Lake (Bonze Layer)
# We use 'overwrite' for this initial load. for daily updates, we would use 'append'
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_crypto_prices")

print(f"Success! {bronze_df.count()} records written to 'bronze_crypto_prices'.")
display(bronze_df)