In [1]:
from delta import configure_spark_with_delta_pip
import pyspark
import requests
import pandas as pd
import json
import csv

In [42]:
with open("credentials.txt", "r") as f:
    api_key = f.read()

headers = {
    'Accepts': 'application/json',
    'X-CMC_PRO_API_KEY': api_key,
}

# API endpoint
url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'

# Making API request
r = requests.get(url, headers=headers)
data = r.json()
# print(data)
transformed_data_list = []

for currency in data['data']:
    transformed_data = {
        'id': int(currency["id"]),  
        'name': currency["name"],
        'symbol': currency["symbol"],
        'date_added': currency["date_added"],
        "Price": float(currency['quote']['USD']['price']), 
        'volume_24h': float(currency['quote']['USD']['volume_24h']),
        'volume_change_24h': float(currency['quote']['USD']['volume_change_24h']),
        'max_supply': float(currency["max_supply"]) if currency["max_supply"] is not None else None,
        'circulating_supply': float(currency["circulating_supply"]) if currency["circulating_supply"] is not None else None,
        'total_supply': float(currency["total_supply"]) if currency["total_supply"] is not None else None,
        'infinite_supply': str(currency["infinite_supply"]),  
        'cmc_rank': int(currency["cmc_rank"]),  
        'last_updated': currency["last_updated"]
    }
    
    transformed_data_list.append(transformed_data)
transformed_data_list

[{'id': 1,
  'name': 'Bitcoin',
  'symbol': 'BTC',
  'date_added': '2010-07-13T00:00:00.000Z',
  'Price': 84644.78995981331,
  'volume_24h': 29436076512.222034,
  'volume_change_24h': -6.1104,
  'max_supply': 21000000.0,
  'circulating_supply': 19836878.0,
  'total_supply': 19836878.0,
  'infinite_supply': 'False',
  'cmc_rank': 1,
  'last_updated': '2025-03-14T19:30:00.000Z'},
 {'id': 1027,
  'name': 'Ethereum',
  'symbol': 'ETH',
  'date_added': '2015-08-07T00:00:00.000Z',
  'Price': 1935.8155746012596,
  'volume_24h': 12816731366.172659,
  'volume_change_24h': -30.3381,
  'max_supply': None,
  'circulating_supply': 120614183.5975569,
  'total_supply': 120614183.5975569,
  'infinite_supply': 'True',
  'cmc_rank': 2,
  'last_updated': '2025-03-14T19:30:00.000Z'},
 {'id': 825,
  'name': 'Tether USDt',
  'symbol': 'USDT',
  'date_added': '2015-02-25T00:00:00.000Z',
  'Price': 1.0000848462505614,
  'volume_24h': 61522047076.00157,
  'volume_change_24h': -11.0094,
  'max_supply': None,
  

In [43]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [44]:
df = spark.createDataFrame(transformed_data_list)

In [45]:
df.show()

+--------------------+--------------------+--------+--------------------+-----+---------------+--------------------+-------------------+------------+------+--------------------+--------------------+-----------------+
|               Price|  circulating_supply|cmc_rank|          date_added|   id|infinite_supply|        last_updated|         max_supply|        name|symbol|        total_supply|          volume_24h|volume_change_24h|
+--------------------+--------------------+--------+--------------------+-----+---------------+--------------------+-------------------+------------+------+--------------------+--------------------+-----------------+
|   84644.78995981331|         1.9836878E7|       1|2010-07-13T00:00:...|    1|          False|2025-03-14T19:30:...|              2.1E7|     Bitcoin|   BTC|         1.9836878E7|2.943607651222203...|          -6.1104|
|  1935.8155746012596| 1.206141835975569E8|       2|2015-08-07T00:00:...| 1027|           True|2025-03-14T19:30:...|               N

In [33]:
lakehouse = spark.sql("""
    CREATE TABLE IF NOT EXISTS crypto_data
        (id INTEGER,
        name STRING,
        symbol STRING,
        date_added TIMESTAMP,
        Price FLOAT,
        volume_24h FLOAT,
        volume_change_24h FLOAT,
        max_supply FLOAT,
        circulating_supply FLOAT,
        total_supply FLOAT,
        infinite_supply STRING,
        cmc_rank INT,
        last_updated TIMESTAMP) USING delta
""")

In [36]:
df.write.mode("append").format("delta").saveAsTable("crypto_data")

AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'id' and 'id'