<a href="https://colab.research.google.com/github/AnirudhSinghBhadauria/pyspark/blob/main/crypto_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [124]:
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
import matplotlib.pyplot as plt

from pyspark.sql.types import *
from pyspark import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [37]:
spark = SparkSession.builder.appName('crypto-test').getOrCreate()

In [133]:
crypto_df = (
    spark.read.format("csv")
    .option("mode", "FAILFAST")
    .option("inferSchema", "true")
    .option("skipRows", 0)
    .option("header", "true")
    .load("/content/crypto_tradinds.csv")
)

remove_columns = [
    'site_url',
    'github_url',
    'crypto_type',
    'BTC_price_change_1_day',
    'industry_name',
    'price_btc'
]

crypto_df = crypto_df.select(
    [column for column in crypto_df.columns if column not in remove_columns]
)

In [None]:
crypto_df.persist(StorageLevel.MEMORY_ONLY)

In [None]:
crypto_df.filter(
    col("ticker") == 'ETH'
).show()

#### **Rolling Averages** for any crypto currency!

In [84]:
def rolling_average(currency, days):
  window = (Window.orderBy("trade_date").rowsBetween(-(days), 0))

  rolling_average_currency = (
      crypto_df
      .filter(
          col("ticker") == currency
      )
      .withColumn(
          "rolling_average",
          round(avg("price_usd").over(window), 3)
      )
  )

  y_value = [ravg.rolling_average for ravg in rolling_average_currency.select("rolling_average").collect()]
  x_value = [tdate.trade_date for tdate in rolling_average_currency.select("trade_date").collect()]

  plt.plot(x_value, y_value)
  plt.show()

In [None]:
rolling_average("BTC", 7)

In [None]:
(
  crypto_df
  .select(["ticker", "crypto_name"])
  .distinct()
  .sample(fraction = 0.5)
  .show(n = crypto_df.count(), truncate = False)
)

In [None]:
def get_price_inr(usd_price):
    try:
        response = requests.get(
            "https://anyapi.io/api/v1/exchange/convert",
            params={
                'base': 'USD',
                'to': 'INR',
                'amount': usd_price,
                'apiKey': '6ht7uuqsuro70e9fpp7lioi3vnie9j5sos70fd4i6j0ooloj2q4t1j'
            },
            timeout=10
        )
        response.raise_for_status()
        return response.json().get('converted')
    except requests.RequestException as e:
        print(f"Error converting price: {e}")
        return None

def ticker():
    coins_to_fetch = {
        'Bitcoin', 'Ethereum', 'Solana', 'Sui',
        'Dogecoin', 'Polygon Ecosystem Token'
    }

    try:
        response = requests.get(
            "https://api.coinpaprika.com/v1/tickers/",
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
    except requests.RequestException as e:
        print(f"Error fetching tickers: {e}")
        return []

    processed_coins = []

    with ThreadPoolExecutor() as executor:
        future_to_coin = {
            executor.submit(process_coin, coin): coin for coin in data if coin['name'] in coins_to_fetch
        }

        for future in as_completed(future_to_coin):
            coin = future_to_coin[future]
            try:
                processed_coin = future.result()
                if processed_coin:
                    processed_coins.append(processed_coin)
            except Exception as e:
                print(f"Error processing coin {coin['name']}: {e}")

    return processed_coins

def process_coin(coin):
    processed_coin = coin.copy()
    usd_price = coin['quotes']['USD']['price']
    processed_coin['quotes']['INR'] = get_price_inr(usd_price)
    return processed_coin

if __name__ == "__main__":
    crypto_data = ticker()

    crypto_extracted_df = spark.createDataFrame(crypto_data)
    crypto_extracted_df = crypto_extracted_df.select(
      "name", "symbol", "rank", "total_supply",
      "max_supply", "last_updated",
      col("quotes.USD.price").alias("usd_price"),
      col("quotes.USD.volume_24h").alias("usd_volume_24h"),
      col("quotes.USD.market_cap").alias("usd_market_cap"),
      col("quotes.INR").alias("inr_price")
    )

crypto_extracted_df.show()

#### Removing **outliers**!

In [160]:
def remove_outliers(df, group_col, target_col):
    quantiles = df.groupBy(group_col).agg(
        expr(f"percentile_approx({target_col}, 0.25)").alias("Q1"),
        expr(f"percentile_approx({target_col}, 0.75)").alias("Q3")
    )

    quantiles = quantiles.withColumn("IQR", col("Q3") - col("Q1")) \
                         .withColumn("lower_bound", col("Q1") - 1.5 * col("IQR")) \
                         .withColumn("upper_bound", col("Q3") + 1.5 * col("IQR"))

    df_with_bounds = df.join(quantiles, on=group_col, how="left")

    cleaned_df = df_with_bounds.filter(
        (col(target_col) >= col("lower_bound")) &
        (col(target_col) <= col("upper_bound"))
    ).drop("Q1", "Q3", "IQR", "lower_bound", "upper_bound")

    return cleaned_df

cleaned_crypto_df = remove_outliers(crypto_df, "crypto_name", "price_usd")
cleaned_crypto_df.count()

431644