# Processing Data fetched from CoinCap API


In this `notebook` we will use `apache/spark` to deal with data fetched from [CoinCap API](https://docs.coincap.io/)

In [1]:
import requests
from datetime import datetime
import re

## Setup PySpark

First of all we need to setup `PySpark`

In [2]:
!apt-get update -qq

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

!tar xf spark-3.1.2-bin-hadoop2.7.tgz

!pip install -q findspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
  .master('local[*]')\
  .appName("Iniciando com Spark")\
  .getOrCreate()

In [3]:
spark

## Fetching data from CoinCap


functions to fetch data from CoinCap's API:

In [4]:
API_URL = "https://api.coincap.io/v2"

In [5]:
def getCryptoCurrencyRealTimeData(asset: str = ""):
  try:
    res = requests.get(API_URL + f'/assets/{asset}')

    if res.status_code == 200:
      return res.json()
    else:
      print(f'Error: {res.status_code}')
  except requests.exceptions.RequestException as err:
    print(f'Error: {err}')


In [6]:
def getCryptoCurrencyDataRates(asset: str = ""):
  try:
    res = requests.get(API_URL + f'/rates/{asset}')
    print(res)
    if res.status_code == 200:
      return res.json()
    else:
      print(f'Error: {res.status_code}')
  except requests.exceptions.RequestException as err:
    print(f'Error: {err}')

In [7]:
json_assets = getCryptoCurrencyRealTimeData()

In [8]:
data_assets = json_assets['data']

for i,register in enumerate(data_assets):
  register['timestamp'] = json_assets['timestamp']
  data_assets[i] = register

print(data_assets)

[{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '19672556.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1287527683208.3377486213377160', 'volumeUsd24Hr': '9128967839.9910500037656694', 'priceUsd': '65447.9104397180391110', 'changePercent24Hr': '-1.2472524950286412', 'vwap24Hr': '66196.8854766610679226', 'explorer': 'https://blockchain.info/', 'timestamp': 1712204418514}, {'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '120069288.4506371900000000', 'maxSupply': None, 'marketCapUsd': '392055139115.6319969789802531', 'volumeUsd24Hr': '5961011343.9732881373458441', 'priceUsd': '3265.2407969987550817', 'changePercent24Hr': '-1.5500001114446777', 'vwap24Hr': '3323.1744263414114457', 'explorer': 'https://etherscan.io/', 'timestamp': 1712204418514}, {'id': 'tether', 'rank': '3', 'symbol': 'USDT', 'name': 'Tether', 'supply': '106149786847.0736100000000000', 'maxSupply': None, 'marketCapUsd': '1062485837

### Loading Data

Loading data in a local `/Raw` zone

In [9]:
import json

In [10]:
objects_uploaded = []

In [11]:
def dataExtracted(dataset: list[dict[str,any]]):
  print(dataset)
  return [{**data,
           "timestamp":dataset["timestamp"]} for data in dataset["data"]]

In [12]:
def _loadingBatch(batch_data:list[dict[str, any]],
                  endpoint: str,
                  batch_name:str):
  current_date = datetime.now()

  month = str(current_date.month).zfill(2)
  day = str(current_date.day).zfill(2)

  object_key=f"{endpoint.capitalize()}/{current_date.year}/{month}/{day}"

  directory = f"/content/dataset/Raw/JSON/{object_key}"
  os.makedirs(directory, exist_ok=True)
  filename = f"{current_date.timestamp()}-{batch_name}.json"
  filepath = f"{directory}/{filename}"

  with open(filepath, "w") as file:
    json.dump(batch_data, file, indent=2)

  objects_uploaded.append(f"{object_key}/{filename}")

def loadingData():
  assets = dataExtracted(dataset=getCryptoCurrencyRealTimeData())
  rates = dataExtracted(dataset=getCryptoCurrencyDataRates())

  batches_assets = [assets[i:i + 100] for i in range(1,len(assets), 100)]
  batches_rates = [rates[i:i + 100] for i in range(1,len(rates), 100)]


  for i, data in enumerate(batches_assets):
    _loadingBatch(batch_data=data,
                  batch_name=f"batch_{i}",
                  endpoint="assets")

  for i, data in enumerate(batches_rates):
    _loadingBatch(batch_data=data, batch_name=f"batch_{i}",endpoint="rates")

loadingData()

{'data': [{'id': 'bitcoin', 'rank': '1', 'symbol': 'BTC', 'name': 'Bitcoin', 'supply': '19672556.0000000000000000', 'maxSupply': '21000000.0000000000000000', 'marketCapUsd': '1287527683208.3377486213377160', 'volumeUsd24Hr': '9128967839.9910500037656694', 'priceUsd': '65447.9104397180391110', 'changePercent24Hr': '-1.2472524950286412', 'vwap24Hr': '66196.8854766610679226', 'explorer': 'https://blockchain.info/'}, {'id': 'ethereum', 'rank': '2', 'symbol': 'ETH', 'name': 'Ethereum', 'supply': '120069288.4506371900000000', 'maxSupply': None, 'marketCapUsd': '392055139115.6319969789802531', 'volumeUsd24Hr': '5961011343.9732881373458441', 'priceUsd': '3265.2407969987550817', 'changePercent24Hr': '-1.5500001114446777', 'vwap24Hr': '3323.1744263414114457', 'explorer': 'https://etherscan.io/'}, {'id': 'tether', 'rank': '3', 'symbol': 'USDT', 'name': 'Tether', 'supply': '106149786847.0736100000000000', 'maxSupply': None, 'marketCapUsd': '106248583747.4410387572134720', 'volumeUsd24Hr': '2091843

## Glueing Data

In [13]:
from pyspark.sql.functions import col, substring
from pyspark.sql.types import DecimalType, StructType, StructField, StringType, LongType
from pyspark.sql import DataFrame

### Treating data


In [14]:
TREATING_ENDPOINTS="assets,rates"
TREATING_INPUT_ZONE="Raw"
TREATING_TARGET_ZONE="Trusted"
TREATING_OBJECTS_KEY=','.join(objects_uploaded)

In [15]:
# utils functions
def getS3Path(prefix: str,
              endpoint: str,
              subdir:str = "",
              typeFile: str = "",
              filename:str = ""):

    endpoint = endpoint.lower().capitalize()

    filename = f'{subdir}/{filename}.{typeFile.lower()}' if subdir and typeFile and filename else ""

    return f"{prefix}/{endpoint}/{filename}"

def getTimestampFromKeyObject(object_key: str):

    timestamp_prefix = object_key.split("/")[-1].split("-")[0]

    return timestamp_prefix

def getDateSubDirFromKeyObject(object_key: str):
    regex_pattern_extract_subdir = r'(\d{4}/\d{2}/\d{2})'
    match = re.search(regex_pattern_extract_subdir, object_key)
    if match:
        return match.group()
    else:
        raise ValueError("No date subdirectory found in object key: {}".format(object_key))


# ETL functions
def extractData(prefix: str,
                endpoint: str,
                subdir: str = "",
                filename: str = "",
                typeFile: str = ""):

    objects_key = getS3Path(prefix=prefix,
                            endpoint=endpoint,
                            subdir=subdir,
                            filename=filename,
                            typeFile=typeFile)

    return spark.read.option("multiline","true").json(objects_key)


def transformData(endpoint: str,
                  df: DataFrame) -> DataFrame:
    if endpoint.lower() == "assets":
        cols_name = df.columns

        for col_name in cols_name:
            if col_name == 'marketCapUsd' or col_name == 'maxSupply' or col_name == 'priceUsd' or col_name == 'supply' or col_name == 'volumeUsd24Hr' or col_name == 'vwap24Hr':
                df = df.withColumn(col_name, col(col_name).cast(DecimalType(38, 18)))

        return df
    if endpoint.lower() == "rates":
        df = df.withColumn("rateUsd", col("rateUsd").cast(DecimalType(38, 18)))

        return df

    raise Exception("Invalid endpoint")

def loadData(prefix: str,
             endpoint: str,
             subdir_date: str,
             object_key: str,
             df: DataFrame) -> DataFrame:

    path = getS3Path(prefix=prefix,
                     endpoint=endpoint)
    print(path)
    df.show()
    df.write.parquet(f'{path}/{subdir_date}/{getTimestampFromKeyObject(object_key=object_key)}',
                     mode="overwrite")


def execute_job():
    extract_prefix = f"/content/dataset/{TREATING_INPUT_ZONE}/JSON"

    load_prefix = f"/content/dataset/{TREATING_TARGET_ZONE}"

    print(f"[GLUE_JOB_SYSTEM] - Starting to processing data from {extract_prefix} to store into {load_prefix}...")
    print(TREATING_OBJECTS_KEY)
    objects_key = TREATING_OBJECTS_KEY.split(",")
    print(objects_key)
    for endpoint in TREATING_ENDPOINTS.split(","):
        _objects_key = list(filter(lambda object_key: endpoint.lower().capitalize() in object_key,
                              objects_key))
        print(_objects_key)
        _str_objects_key = ", ".join(map(str, _objects_key))

        print(f"[GLUE_JOB_SYSTEM] - Starting processing data stores in {endpoint}...")
        print("[GLUE_JOB_SYSTEM] - The following files will be processed: {}".format(_str_objects_key))

        for object_key in _objects_key:
            print(f"[GLUE_JOB_SYSTEM] -  Starting extracting the data from {endpoint}...")

            subdir= getDateSubDirFromKeyObject(object_key=object_key)
            print(subdir)
            filename = f'{getTimestampFromKeyObject(object_key=object_key)}-*'
            print(f"{extract_prefix}/{endpoint.capitalize()}/{subdir}/{filename}")
            df = extractData(prefix=extract_prefix,
                             endpoint=endpoint,
                             subdir=subdir,
                             filename=filename,
                             typeFile="json")

            print(f"[GLUE_JOB_SYSTEM] - The data extracted has the following scheme:")
            df.printSchema()

            print(f"[GLUE_JOB_SYSTEM] - Starting transforming the data from {endpoint}...")

            df = transformData(endpoint=endpoint,
                               df=df)

            print(f"[GLUE_JOB_SYSTEM] - The data transformed has the following scheme:")
            df.printSchema()

            print(f"[GLUE_JOB_SYSTEM] - Starting to load the transformed data into {load_prefix}...")

            loadData(prefix=load_prefix,
                     endpoint=endpoint,
                     subdir_date=subdir,
                     object_key=object_key,
                     df=df)

            print(f"[GLUE_JOB_SYSTEM] - The data was succefully loaded into {load_prefix}")

execute_job()

[GLUE_JOB_SYSTEM] - Starting to processing data from /content/dataset/Raw/JSON to store into /content/dataset/Trusted...
Assets/2024/04/04/1712204419.123182-batch_0.json,Rates/2024/04/04/1712204419.126377-batch_0.json,Rates/2024/04/04/1712204419.127783-batch_1.json
['Assets/2024/04/04/1712204419.123182-batch_0.json', 'Rates/2024/04/04/1712204419.126377-batch_0.json', 'Rates/2024/04/04/1712204419.127783-batch_1.json']
['Assets/2024/04/04/1712204419.123182-batch_0.json']
[GLUE_JOB_SYSTEM] - Starting processing data stores in assets...
[GLUE_JOB_SYSTEM] - The following files will be processed: Assets/2024/04/04/1712204419.123182-batch_0.json
[GLUE_JOB_SYSTEM] -  Starting extracting the data from assets...
2024/04/04
/content/dataset/Raw/JSON/Assets/2024/04/04/1712204419.123182-*
[GLUE_JOB_SYSTEM] - The data extracted has the following scheme:
root
 |-- changePercent24Hr: string (nullable = true)
 |-- explorer: string (nullable = true)
 |-- id: string (nullable = true)
 |-- marketCapUsd: s

### Refining data

In [16]:
REFINING_ENDPOINTS="assets,rates"
REFINING_INPUT_ZONE="Trusted"
REFINING_TARGET_ZONE="Refined"
REFINING_OBJECTS_KEY=','.join(objects_uploaded)

In [17]:

def getS3Path(prefix: str,
              endpoint: str,
              subdir:str = "",
              typeFile: str = "",
              filename:str = ""):

    endpoint = endpoint = '_'.join(map(lambda sub: sub.capitalize(),
                                       endpoint.lower().split('_'))) if '_' in endpoint else endpoint.lower().capitalize()

    filename = f'{subdir}/{filename}'

    return f"{prefix}/{endpoint}/{filename}"

def extractData(prefix: str,
                endpoint: str,
                subdir: str = "",
                filename: str = "",
                typeFile: str = "",
                deepRead: bool = False):
    objects_key = getS3Path(prefix=prefix,
                            endpoint=endpoint,
                            subdir=subdir,
                            filename=filename,
                            typeFile=typeFile)
    # the files are uploaded in the following path: Zone/Entity/Year/Month/Day/Timestamp
    # if deepRead on it will read all subdir within Entity
    return spark.read.parquet(objects_key if not deepRead else f"{objects_key}/*/*/*/*/")

def transformData(entity: str,
                  df: DataFrame) -> DataFrame:
    if entity.lower() == "currency":
        df = df.select(col('id').alias("currency_id"),
                  col('currencySymbol').alias("currency_special_symbol"),
                  col('symbol').alias("currency_symbol"),
                  col("type").alias("currency_type"))

        df = df.withColumn("currency_type", substring(col("currency_type"), 1, 1))

        df.show()
        return df
    if entity.lower() == "crypto_data":
        df = df.select(col('id').alias("currency_id"),
                       col('changePercent24Hr').alias("diff_percent_24hr"),
                       col('maxSupply').alias("max_supply"),
                       col('supply'),
                       col('vwap24Hr').alias("vwap_24hr"),
                       col('volumeUsd24Hr').alias("vol_usd_24hr"),
                       col("marketCapUsd").alias("market_cap_usd"),
                       col('timestamp'))

        return df
    if entity.lower() == "currency_value":
        df.show()
        df = df.select(col("id").alias("currency_id"),col('rateUsd').alias("rate_usd"),
                       col('timestamp'))

        return df

    raise Exception("Invalid endpoint")

# Since method Currency will almost always be the same
# This method avoid load duplicate data into Refined zone
def checkAndUpdateRowsWithinRefinedZone(load_prefix: str,
                                        entity: str,
                                        df: DataFrame):
  try:
    # extract data already saved
    # deepRead will allow the spark to read all the subdir within the /Zone/Entity/*
    df_saved = extractData(prefix=load_prefix,
                           endpoint=entity, deepRead=True)
    print("df_saved table: ")
    df_saved.show()
    if df.schema != df_saved.schema:
      df_saved.printSchema()
      df.printSchema()
      print("The data frames don't have the same schema")
      return
    else:
        diff_df = df.exceptAll(df_saved)
        if diff_df.count() == 0:
          print("The data frames has the same values.")

          return None
        else:
          print("The data frames has different values.")
          diff_df.show()
          return df.union(df_saved)

  except Exception as err:
    print(err)
    return df


# ETL functions
def loadData(prefix: str,
             entity: str,
             subdir_date: str,
             filename: str,
             df: DataFrame) -> DataFrame:

    path = getS3Path(prefix=prefix,
                     endpoint=entity)

    if entity.lower() == "currency":
        df = checkAndUpdateRowsWithinRefinedZone(load_prefix=prefix,
                                                 entity=entity,
                                                 df=df)
        if not df:
            print(f"{entity} wasn't uploaded into Refined zone 'cause there is no new data to update")
            return None

    df.write.parquet(f'{path}/{subdir_date}/{filename}',
                     mode="overwrite")

def getTimestampFromKeyObject(object_key: str):

    timestamp_prefix = object_key.split("/")[-1].split("-")[0]

    return timestamp_prefix

def getDateSubDirFromKeyObject(object_key: str):
    regex_pattern_extract_subdir = r'(\d{4}/\d{2}/\d{2})'
    match = re.search(regex_pattern_extract_subdir, object_key)
    if match:
        return match.group()
    else:
        raise ValueError("No date subdirectory found in object key: {}".format(object_key))

def execute_job():
    extract_prefix = f"/content/dataset/{REFINING_INPUT_ZONE}"

    load_prefix = f"/content/dataset/{REFINING_TARGET_ZONE}"

    print(f"[GLUE_JOB_SYSTEM] - Starting to processing data from {extract_prefix} to store into {load_prefix}...")

    print(TREATING_OBJECTS_KEY)
    objects_key = TREATING_OBJECTS_KEY.split(",")
    print(objects_key)
    for endpoint in TREATING_ENDPOINTS.split(","):
        _objects_key = filter(lambda object_key: endpoint.lower().capitalize() in object_key,
                              objects_key)

        print(f"[GLUE_JOB_SYSTEM] - Starting refining data stores in {endpoint}...")

        for object_key in _objects_key:
            filename = f'{getTimestampFromKeyObject(object_key=object_key)}'
            subdir= getDateSubDirFromKeyObject(object_key=object_key)

            print(f"[GLUE_JOB_SYSTEM] - The following pathfile within /{REFINING_INPUT_ZONE} will be processed: {subdir}/{filename}")

            print(f"[GLUE_JOB_SYSTEM] -  Starting extracting the data from {endpoint}...")

            df = extractData(prefix=extract_prefix,
                             subdir=subdir,
                             endpoint=endpoint,
                             filename=filename,)
            entities = []

            if endpoint.lower() == "rates":
                entities.append("Currency")
                entities.append("Currency_Value")

            if endpoint.lower() == "assets":
                entities.append("Crypto_Data")

            print("[GLUE_JOB_SYSTEM] - The data will be treat as source data for the following entities: {}".format(", ".join(entities)))

            for entity in entities:

                print(f"[GLUE_JOB_SYSTEM] - The data is being transformed into a {entity}'s data structured...")
                print(df)
                df_transformed = transformData(entity=entity,
                                               df=df)

                print(f"[GLUE_JOB_SYSTEM] - The data transformed has the following scheme:")
                df_transformed.printSchema()

                print(f"[GLUE_JOB_SYSTEM] - Starting to load the transformed data into {load_prefix}...")

                loadData(prefix=load_prefix,
                         entity=entity,
                         filename= filename,
                         subdir_date=subdir,
                         df=df_transformed)

                print(f"[GLUE_JOB_SYSTEM] - The data was succefully loaded into {load_prefix}")

execute_job()

[GLUE_JOB_SYSTEM] - Starting to processing data from /content/dataset/Trusted to store into /content/dataset/Refined...
Assets/2024/04/04/1712204419.123182-batch_0.json,Rates/2024/04/04/1712204419.126377-batch_0.json,Rates/2024/04/04/1712204419.127783-batch_1.json
['Assets/2024/04/04/1712204419.123182-batch_0.json', 'Rates/2024/04/04/1712204419.126377-batch_0.json', 'Rates/2024/04/04/1712204419.127783-batch_1.json']
[GLUE_JOB_SYSTEM] - Starting refining data stores in assets...
[GLUE_JOB_SYSTEM] - The following pathfile within /Trusted will be processed: 2024/04/04/1712204419.123182
[GLUE_JOB_SYSTEM] -  Starting extracting the data from assets...
[GLUE_JOB_SYSTEM] - The data will be treat as source data for the following entities: Crypto_Data
[GLUE_JOB_SYSTEM] - The data is being transformed into a Crypto_Data's data structured...
DataFrame[changePercent24Hr: string, explorer: string, id: string, marketCapUsd: decimal(38,18), maxSupply: decimal(38,18), name: string, priceUsd: decimal(3

#### Testing Results

In [18]:
currency = spark.read.parquet("/content/dataset/Refined/Currency/2024/*/*/*/*")
crypto_data = spark.read.parquet("/content/dataset/Refined/Crypto_Data/*/*/*/*")
currency_value = spark.read.parquet("/content/dataset/Refined/Currency_Value/*/*/*/*")

currency.show()
crypto_data.show()
currency_value.show()

+--------------------+-----------------------+---------------+-------------+
|         currency_id|currency_special_symbol|currency_symbol|currency_type|
+--------------------+-----------------------+---------------+-------------+
|      argentine-peso|                      $|            ARS|            f|
|        binance-coin|                   null|            BNB|            c|
|            dogecoin|                   null|           DOGE|            c|
|   australian-dollar|                      $|            AUD|            f|
|     norwegian-krone|                     kr|            NOK|            f|
|    sri-lankan-rupee|                      ₨|            LKR|            f|
|      afghan-afghani|                     ؋ |            AFN|            f|
|       ghanaian-cedi|                      ¢|            GHS|            f|
|             bitcoin|                      ₿|            BTC|            c|
|         yemeni-rial|                      ﷼|            YER|            f|