<a href="https://colab.research.google.com/github/ddkryptonite/aws-glue-pyspark-crypto-etl/blob/main/AWSGlue_PySpark_Script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import sys
import json
import boto3
import requests
import pyspark.sql.functions as F
from pyspark import SparkContext
from pyspark.sql import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
import logging
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from decimal import Decimal

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Glue
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# API Configuration
API_KEY = "#########"
url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'

headers = {
    'X-CMC_PRO_API_KEY': API_KEY,
    'Accepts': 'application/json'
}

# Fetch API data
response = requests.get(url, headers=headers)
data = response.json()['data']

# Extract required fields manually
flattened_data = []

for entry in data:
    flattened_data.append({
        "id": entry["id"],
        "name": entry["name"],
        "symbol": entry["symbol"],
        "price_usd": entry["quote"]["USD"]["price"],  # ✅ Extract directly
        "market_cap_usd": entry["quote"]["USD"]["market_cap"],
        "volume_24h_usd": entry["quote"]["USD"]["volume_24h"],
        "last_updated": entry["last_updated"]
    })

# Define Schema
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("price_usd", DoubleType(), True),
    StructField("market_cap_usd", DoubleType(), True),
    StructField("volume_24h_usd", DoubleType(), True),
    StructField("last_updated", StringType(), True)
])

# Convert flattened JSON to DataFrame
df = spark.createDataFrame(flattened_data, schema=schema)

# Debugging: Print Schema to Check Column Names and Types
df.printSchema()  # ✅ This helps verify correctness

# ✅ Fix: Use Flattened Column Names
df = df.select(
    F.col("id").alias("crypto_id"),
    F.col("name").alias("crypto_name"),
    F.col("symbol"),
    F.col("price_usd"),
    F.col("market_cap_usd"),
    F.col("volume_24h_usd"),
    F.col("last_updated")
)

# Replace NaN with NULL, then cast column
df = df.withColumn("price_usd", F.when(F.col("price_usd").isNotNull(), F.col("price_usd").cast(DoubleType())).otherwise(None))
df = df.withColumn("market_cap_usd", F.when(F.col("market_cap_usd").isNotNull(), F.col("market_cap_usd").cast(DoubleType())).otherwise(None))
df = df.withColumn("volume_24h_usd", F.when(F.col("volume_24h_usd").isNotNull(), F.col("volume_24h_usd").cast(DoubleType())).otherwise(None))

# Save Processed Data to S3 (Parquet)
df.write.mode("overwrite").parquet("s3://danieldogbey-awsglue-bucket/processed_crypto_data/")

# Store Processed Data in DynamoDB
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("processed_crypto_table")



with table.batch_writer() as batch:
    for row in df.collect():
        item = {
            "id": row["crypto_id"],
            "crypto_name": row["crypto_name"],
            "symbol": row["symbol"],
            "price_usd": Decimal(str(row["price_usd"])) if row["price_usd"] is not None else None,
            "market_cap_usd": Decimal(str(row["market_cap_usd"])) if row["market_cap_usd"] is not None else None,
            "volume_24h_usd": Decimal(str(row["volume_24h_usd"])) if row["volume_24h_usd"] is not None else None,
            "last_updated": row["last_updated"]
        }
        batch.put_item(Item=item)


logger.info("✅ ETL Pipeline Completed Successfully")

job.commit()
