# Checking Json response

In [None]:
import requests
import json
import pymysql
import time

l=["http://api.coincap.io/v2/assets/bitcoin",
 "http://api.coincap.io/v2/assets/ethereum",
"http://api.coincap.io/v2/assets/tether",
 "http://api.coincap.io/v2/assets/polygon",
"http://api.coincap.io/v2/assets/xrp"]
for url in l:
    # response = requests.request("GET", url, headers=headers, data = payload)
    response = requests.request("GET", url)

    json_data = json.loads(response.text.encode('utf8'))

    print(json_data)

# Check Database Connectivity

In [None]:
import pymysql

DB_HOST = 'localhost' # MySQL database configuration
DB_USER = 'root'
DB_PASSWORD = 'RootPass@#007'
DB_NAME = 'pyspark_project'
try:
    
    connection = pymysql.connect(host=DB_HOST, # Connecting to MySQL server
                                 user=DB_USER,
                                 password=DB_PASSWORD,
                                 db=DB_NAME,
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)
    print("Connection successful!")
except pymysql.Error as e:
    print(f"Connection failed: {e}")
finally:
    
    if connection:
        connection.close() # Closing the connection


# Plotting Graphs

In [None]:
from pyspark.sql import SparkSession
import plotly.graph_objects as go
import time
from IPython.display import clear_output


# Providing path to the MySQL Connector/JAR file to enable communication between spark & database 
mysql_connector_jar = "/Users/svashi/Documents/kafka/kafka_2.12-3.5.0/mysql-connector-j-8.4.0/mysql-connector-j-8.4.0.jar"

# Creating a SparkSession 
spark = SparkSession.builder.appName("YourAppName").config("spark.driver.extraClassPath", mysql_connector_jar).config("spark.executor.extraClassPath", mysql_connector_jar).getOrCreate()

# Setting up MySQL database configuration 
DB_HOST = 'localhost'
DB_USER = 'root'
DB_PASSWORD = 'RootPass@#007'
DB_NAME = 'pyspark_project'


def fetch_data(symbol): # This function is to fetch data from MySQL database
    df = spark.read.format("jdbc").options(
        url=f"jdbc:mysql://{DB_HOST}/{DB_NAME}",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="crypto_prices",
        user=DB_USER,
        password=DB_PASSWORD,
        partitionColumn="idx",  
        lowerBound=1,
        upperBound=100,
        numPartitions=10
    ).load()
    return df.filter(df.symbol == symbol).toPandas()

 
symbols = ['BTC', 'ETH', 'XRP', 'USDT', 'MATIC'] # Defining the Crypto Coin Symbols



figures = []
# The below code will fetch the data of respective cryptos from the database and plots a graph for each
colors = ['blue', 'green', 'red', 'purple', 'orange']
for symbol, color in zip(symbols, colors):
    data = fetch_data(symbol)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=data['timestamp'], y=data['priceUsd'], mode='lines', name=symbol, line=dict(color=color)))
    fig.update_layout(title=f'Crypto Price - {symbol}',
                      xaxis_title='Timestamp',
                      yaxis_title='Price (USD)',
                      xaxis=dict(rangeslider=dict(visible=True)),  
                      showlegend=True)
    figures.append(fig)


for fig in figures: # This will display the initial plots
    fig.show()


while True: # Initiating a loop to plot the graph to include the most recent data
    time.sleep(10) # with a 10 second wait time
    
    clear_output(wait=True) # This will clear the output of the current cell
    
    for fig, symbol in zip(figures, symbols): # this code will fetch & update the plots
        data = fetch_data(symbol) 
        fig.data[0].x = data['timestamp']
        fig.data[0].y = data['priceUsd']
    for fig in figures:
        fig.show()
    
