In [2]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

sys.path.append("../../..")
from utils.get_date_today_iso_string import get_date_today_iso_string
from pipelines.cleaning_functions import (
    null_value_clean,
    whitespaces_clean,
    negative_value_clean,
    duplicates_clean,
    datestring_format_and_null_check,
    rank_clean_with_flag,
    change_percentage_check
)



spark = SparkSession.builder.appName("ReadParquetFile").getOrCreate()

extraction_date = get_date_today_iso_string()

bronze_to_silver_df = spark.read.parquet(f"../../../data/bronze/coin_markets/parquet/{extraction_date}/")

with open("../../../notebooks/bronze/schema_order.txt") as f:
    bronze_to_silver_df_column_order = f.read().split(",")

bronze_to_silver_df = bronze_to_silver_df.select(*bronze_to_silver_df_column_order)

#WILL FIX THE REPETITIVENESS LATER


In [3]:
# CLEAN FOR NULL VALUES, UNWANTED WHITESPACES AND DUPLICATES OF ID COLUMN

id_cleaning_functions = [null_value_clean, whitespaces_clean, duplicates_clean]

for function in id_cleaning_functions:
    bronze_to_silver_df = function(bronze_to_silver_df, "id")

bronze_to_silver_df.show(5)

+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [4]:
# CLEAN FOR NULL VALUES abd UNWANTED WHITESPACES OF SYMBOL COLUMN

symbol_cleaning_functions = [null_value_clean, whitespaces_clean]

for function in symbol_cleaning_functions:
    bronze_to_silver_df = function(bronze_to_silver_df, "symbol")

bronze_to_silver_df.show(5)

+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [5]:
# CLEAN FOR NULL VALUES abd UNWANTED WHITESPACES OF NAME COLUMN

name_cleaning_functions = [null_value_clean, whitespaces_clean]

for function in name_cleaning_functions:
    bronze_to_silver_df = function(bronze_to_silver_df, "symbol")

bronze_to_silver_df.show(5)

+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [6]:
# CLEAN NULL VALUES FOR IMAGE COLUMN
## THERE IS NO CLEANING YET FOR INVALID URL FORMATS FOR image COLUMN because there is NO BAD DATA

bronze_to_silver_df = null_value_clean(bronze_to_silver_df, "image")

bronze_to_silver_df.show(5)

+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [7]:
# CLEAN FOR NEGATIVE VALUES OF CURRENT PRICE COLUMN

bronze_to_silver_df = negative_value_clean(bronze_to_silver_df,"current_price")

bronze_to_silver_df.show(5)


+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [8]:
# CLEAN FOR NEGATIVE VALUES OF MARKET CAP COLUMN

bronze_to_silver_df = negative_value_clean(bronze_to_silver_df,"market_cap")

bronze_to_silver_df.show(5)


+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [None]:
# CLEAN FOR NEGATIVE VALUES OF MARKET CAP RANK COLUMN

bronze_to_silver_df = negative_value_clean(bronze_to_silver_df,"market_cap_rank")

bronze_to_silver_df.show(5)

+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+--------------------+-------------------+----------+------+---------------------+--------------------+--------+---------------------+--------------------+--------------------+--------------------+
|      id|symbol|    name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h|  circulating_supply|       total_supply|max_supply|   ath|ath_change_percentage|            ath_date|     atl|atl_change_percentage|            atl_date|                 roi|        last_updated|
+--------+------+--------+--------------------+-------------+----------+---------------+-----------------------+--------

In [None]:
# ADD FLAG FOR MISMATCHING market_cap_rank BASED ON VALUES OF market_cap ARRANGED MANUALLY

bronze_to_silver_df = rank_clean_with_flag(bronze_to_silver_df, "market_cap", "market_cap_rank")

bronze_to_silver_df.filter(F.col("market_cap_rank_mismatch_flag")).show(5)

+-------------+------+------------+--------------------+-------------+----------+---------------+-----------------------+------------+--------+--------+----------------+---------------------------+---------------------+--------------------------------+-------------------+--------------------+----------+-----+---------------------+--------------------+---------+---------------------+--------------------+----+--------------------+------------------------+-----------------------------+
|           id|symbol|        name|               image|current_price|market_cap|market_cap_rank|fully_diluted_valuation|total_volume|high_24h| low_24h|price_change_24h|price_change_percentage_24h|market_cap_change_24h|market_cap_change_percentage_24h| circulating_supply|        total_supply|max_supply|  ath|ath_change_percentage|            ath_date|      atl|atl_change_percentage|            atl_date| roi|        last_updated|expected_market_cap_rank|market_cap_rank_mismatch_flag|
+-------------+------+--

In [10]:
# CHECK FOR NEGATIVE OR NULL VALUES OF FULLY DILUATED VALUATION COLUMN

null_or_negative_value_check(bronze_to_silver_df,"fully_diluted_valuation")

+---+-------------------------------+
| id|invalid_fully_diluted_valuation|
+---+-------------------------------+
+---+-------------------------------+



In [11]:
# CHECK FOR NEGATIVE OR NULL VALUES OF TOTAL VOLUME COLUMN

null_or_negative_value_check(bronze_to_silver_df,"total_volume")

+---+--------------------+
| id|invalid_total_volume|
+---+--------------------+
+---+--------------------+



In [12]:
# CHECK FOR NEGATIVE OR NULL VALUES OF HIGH 24H COLUMN

null_or_negative_value_check(bronze_to_silver_df,"high_24h")

+------------+----------------+
|          id|invalid_high_24h|
+------------+----------------+
|figure-heloc|            NULL|
+------------+----------------+



In [13]:
# CHECK FOR NEGATIVE OR NULL VALUES OF LOW 24H COLUMN

null_or_negative_value_check(bronze_to_silver_df,"low_24h")

+------------+---------------+
|          id|invalid_low_24h|
+------------+---------------+
|figure-heloc|           NULL|
+------------+---------------+



In [14]:
# CHECK FOR INVALID HIGH AND LOW PRICES AT 24H WHEREIN THE HIGH IS LOWER THAN LOW

bronze_to_silver_df.filter(F.col("high_24h") < F.col("low_24h"))\
    .select(F.col("id"),F.col("high_24h"), F.col("low_24h"))\
    .show()

+---+--------+-------+
| id|high_24h|low_24h|
+---+--------+-------+
+---+--------+-------+



In [15]:
# CHECK FOR NULL VALUES OF PRICE CHANGE 24H COLUMN

null_value_check(bronze_to_silver_df,"price_change_24h")

+------------+---------------------+
|          id|null_price_change_24h|
+------------+---------------------+
|figure-heloc|                 NULL|
+------------+---------------------+



In [16]:
# CHECK FOR NULL VALUES OF PRICE CHANGE PERCENTAGE 24H COLUMN

null_value_check(bronze_to_silver_df,"price_change_percentage_24h")

+------------+--------------------------------+
|          id|null_price_change_percentage_24h|
+------------+--------------------------------+
|figure-heloc|                            NULL|
+------------+--------------------------------+



In [17]:
# CHECK IF PRICE CHANGE PERCENTAGE IS EQUAL TO PRICE CHANGE DIVIDED BY PREVIOUS PRICE WITH TOLERANCE OF 0.1

change_percentage_check(bronze_to_silver_df, "current_price", "price_change_24h",  "price_change_percentage_24h")


+---+-------------+----------------+----------------------+-----------------------------------+---------------------------+
| id|current_price|price_change_24h|previous_current_price|derived_price_change_percentage_24h|price_change_percentage_24h|
+---+-------------+----------------+----------------------+-----------------------------------+---------------------------+
+---+-------------+----------------+----------------------+-----------------------------------+---------------------------+



In [18]:
# CHECK FOR NULL VALUES OF MARKET CAP CHANGE 24H COLUMN

null_value_check(bronze_to_silver_df,"market_cap_change_24h")

+------------+--------------------------+
|          id|null_market_cap_change_24h|
+------------+--------------------------+
|figure-heloc|                      NULL|
+------------+--------------------------+



In [19]:
# CHECK FOR NULL VALUES OF MARKET CAP CHANGE PERCENTAGE 24H COLUMN

null_value_check(bronze_to_silver_df,"market_cap_change_percentage_24h")

+------------+-------------------------------------+
|          id|null_market_cap_change_percentage_24h|
+------------+-------------------------------------+
|figure-heloc|                                 NULL|
+------------+-------------------------------------+



In [20]:
# CHECK IF MARKET CAP CHANGE PERCENTAGE IS EQUAL TO MARKET CAP CHANGE DIVIDED BY PREVIOUS MARKET CAP WITH TOLERANCE OF 0.1

change_percentage_check(bronze_to_silver_df, "market_cap", "market_cap_change_24h",  "market_cap_change_percentage_24h")


+---+----------+---------------------+-------------------+----------------------------------------+--------------------------------+
| id|market_cap|market_cap_change_24h|previous_market_cap|derived_market_cap_change_percentage_24h|market_cap_change_percentage_24h|
+---+----------+---------------------+-------------------+----------------------------------------+--------------------------------+
+---+----------+---------------------+-------------------+----------------------------------------+--------------------------------+



In [21]:
# CHECK FOR NEGATIVE OR NULL VALUES OF CIRCULATING SUPPLY COLUMN

null_or_negative_value_check(bronze_to_silver_df,"circulating_supply")

+---+--------------------------+
| id|invalid_circulating_supply|
+---+--------------------------+
+---+--------------------------+



In [22]:
# CHECK FOR NEGATIVE OR NULL VALUES OF TOTAL SUPPLY COLUMN

null_or_negative_value_check(bronze_to_silver_df,"total_supply")

+---+--------------------+
| id|invalid_total_supply|
+---+--------------------+
+---+--------------------+



In [23]:
# CHECK FOR NEGATIVE OR NULL OF MAX SUPPLY COLUMN

null_or_negative_value_check(bronze_to_silver_df,"max_supply")

+--------------------+------------------+
|                  id|invalid_max_supply|
+--------------------+------------------+
|            ethereum|              NULL|
|              tether|              NULL|
|              solana|              NULL|
|            usd-coin|              NULL|
|        staked-ether|              NULL|
|                tron|              NULL|
|            dogecoin|              NULL|
|       wrapped-steth|              NULL|
|        figure-heloc|              NULL|
|  wrapped-beacon-eth|              NULL|
|        wrapped-eeth|              NULL|
|                usds|              NULL|
|binance-bridged-u...|              NULL|
|           leo-token|              NULL|
|         ethena-usde|              NULL|
|coinbase-wrapped-btc|              NULL|
|              monero|              NULL|
|                weth|              NULL|
|           shiba-inu|              NULL|
|            polkadot|              NULL|
+--------------------+------------

In [24]:
# CHECK FOR NEGATIVE OR NULL VALUES OF ath COLUMN

null_or_negative_value_check(bronze_to_silver_df,"ath")

+---+-----------+
| id|invalid_ath|
+---+-----------+
+---+-----------+



In [25]:
# CHECK FOR NULL VALUES OF ath_change_percentage COLUMN

null_value_check(bronze_to_silver_df,"ath_change_percentage")

+---+--------------------------+
| id|null_ath_change_percentage|
+---+--------------------------+
+---+--------------------------+



In [26]:
# CHECK FOR INVALID DATE FORMAT, DATE IS IN THE FUTURE AND NULL VALUES OF ath_date COLUMN

datestring_format_and_null_check(bronze_to_silver_df,"ath_date")

+---+--------------------+
| id|invalid_ath_date_url|
+---+--------------------+
+---+--------------------+



In [27]:
# CHECK FOR NEGATIVE OR NULL VALUES OF atl COLUMN

null_or_negative_value_check(bronze_to_silver_df,"atl")

+---+-----------+
| id|invalid_atl|
+---+-----------+
+---+-----------+



In [28]:
# CHECK FOR NULL VALUES OF atl_change_percentage COLUMN

null_value_check(bronze_to_silver_df,"atl_change_percentage")

+---+--------------------------+
| id|null_atl_change_percentage|
+---+--------------------------+
+---+--------------------------+



In [29]:
# CHECK FOR INVALID DATE FORMAT, DATE IS IN THE FUTURE AND NULL VALUES OF atl_date COLUMN

datestring_format_and_null_check(bronze_to_silver_df,"atl_date")

+---+--------------------+
| id|invalid_atl_date_url|
+---+--------------------+
+---+--------------------+



In [30]:
# CHECK FOR PARTIALLY EMPTY STRUCT OR WHITESPACED currency value on roi column

bronze_to_silver_df.filter(F.col("roi").isNotNull()
    & 
    (
        F.col("roi.currency").isNull() |
        (F.trim(F.col("roi.currency")) != F.col("roi.currency")) |
        F.col("roi.percentage").isNull() |
        F.col("roi.times").isNull()
    )
    )\
    .select(F.col("id"), F.col("roi.currency"), F.col("roi.percentage"), F.col("roi.times")).show(truncate=False)

+---+--------+----------+-----+
|id |currency|percentage|times|
+---+--------+----------+-----+
+---+--------+----------+-----+



In [31]:
# CHECK FOR INVALID DATE FORMAT, DATE IS IN THE FUTURE AND NULL VALUES OF last_updated COLUMN

datestring_format_and_null_check(bronze_to_silver_df,"last_updated")

+---+------------------------+
| id|invalid_last_updated_url|
+---+------------------------+
+---+------------------------+



In [32]:
# silver_df = (
#     bronze_df
#     .dropDuplicates()
#     .filter(bronze_df["price"].isNotNull())
#     .withColumnRenamed("timestamp", "event_time")
# )

# silver_df.write.mode("overwrite").parquet("silver/crypto_data.parquet")