In [10]:
import redis
import pandas as pd
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch
import pyspark.sql.functions as psf
import pyspark.sql.types as pst


# run news_to_redis !!

connection = redis.StrictRedis(
    host='localhost',  # Replace with your Redis server host
    port=6379,         # Default Redis port
    db=0,              # Default database
    decode_responses=True  # Ensures responses are returned as strings
)


data = {"source":[], "min_sentiment":[], "max_sentiment": [], "avg_sentiment": [], "std_sentiment": []}
keys = []
# iterate over all saved news aggregates
for key in connection.scan_iter("sentiments_aggregates:*"):
    keys.append(key)
    fields = connection.hkeys(key)
    for field in data.keys():
        if field in fields:
            data[field].append(connection.hget(key, field))
        else:
            data[field].append(None)

df = pd.DataFrame(data)
df["key"] = keys
connection.close()
df

Unnamed: 0,source,min_sentiment,max_sentiment,avg_sentiment,std_sentiment,key
0,Cryptocurrency News,0.6588,0.6588,0.6588,,sentiments_aggregates:9186c9573b164ce9859192bd...
1,Cointelegraph,-0.2023,0.6597,0.0094934210526315,0.2745888593688274,sentiments_aggregates:bd17aefb4f9b432b92254d8f...
2,CoinDesk,-0.0772,0.6602,0.063578947368421,0.3102485429797417,sentiments_aggregates:8b161887a05b479190225752...


In [5]:
# run logs_to_elasticsearch

es = Elasticsearch(
    hosts=["http://localhost:9200"],
)



In [97]:
# Start a scroll query
response = es.search(
    index="tickers",
    query={"match_all": {}},
    size=100,
    scroll="1m"  # Keep the search context alive for 1 minute
)

scroll_id = response["_scroll_id"]

data = []
while True:
    # Fetch the next batch of documents
    response = es.scroll(scroll_id=scroll_id, scroll="1m")
    
    hits = response["hits"]["hits"]
    if not hits:
        break  # Exit when no more documents are found
    
    for hit in hits:
        data.append(hit['_source'])

    scroll_id = response["_scroll_id"]

# Clear the scroll context when done
es.clear_scroll(scroll_id=scroll_id)

df = pd.DataFrame(data)
df.describe()

Unnamed: 0,price,open_24h,volume_24h,low_24h,high_24h,volume_30d,best_bid,best_bid_size,best_ask,best_ask_size,trade_id,last_size,date,timestamp
count,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0,40877.0
mean,98506.807279,98259.989388,8971.079319,97746.880334,100235.576467,431979.158848,98505.445436,0.09507203,98507.80813,0.1282163,745704200.0,0.00487807,1734972000000.0,1735028000000.0
std,835.769201,593.214932,4463.253918,429.09443,1537.853069,88006.550128,835.940173,0.1491669,835.894191,0.4871345,9181414.0,0.03709513,1187025000.0,1194702000.0
min,97550.47,97714.25,4698.150348,97516.65,98969.92,359404.266655,97550.47,1e-08,97559.16,8e-08,734515200.0,1e-08,1733530000000.0,1733570000000.0
25%,97785.56,97804.67,5329.328852,97516.65,98969.92,359542.173125,97783.94,0.00135934,97786.17,0.00334039,734562400.0,1.603e-05,1733530000000.0,1733579000000.0
50%,98003.21,97976.46,5811.546324,97516.65,98969.92,359616.091494,98001.47,0.0322566,98003.25,0.03674168,753248800.0,0.00015166,1735949000000.0,1736009000000.0
75%,99476.31,98489.44,14534.043147,97581.44,102104.12,538838.056939,99475.94,0.11573,99477.86,0.1921731,753263500.0,0.00157315,1735949000000.0,1736012000000.0
max,99795.52,99581.58,15253.266547,98666.26,102104.12,539111.734722,99784.01,1.49021,99795.52,12.89041,753278200.0,4.847932,1735949000000.0,1736015000000.0


In [111]:
from datetime import datetime
min_timestamp = df["timestamp"].min()
max_timestamp = df["timestamp"].max()
print("tickers min timestamp: ", min_timestamp)
print("tickers max timestamp: ", max_timestamp)

tickers min timestamp:  1733569661083
tickers max timestamp:  1736015065228


In [126]:
sdf = spark.read.parquet("hdfs://localhost/crypto/exchange/ticker")

hdfs_count = sdf.filter(psf.to_timestamp(psf.col("time")) <= datetime.fromtimestamp(max_timestamp / 1000.0)).filter(psf.to_timestamp(psf.col("time")) >= datetime.fromtimestamp(min_timestamp / 1000.0)).count()
print("ticker elastic count: ", df.shape[0])
print("ticker hdfs count: ", hdfs_count)

ticker elastic count:  40877
ticker hdfs count:  35999


In [68]:
# Start a scroll query
response = es.search(
    index="transactions",
    query={"match_all": {}},
    size=100,
    scroll="1m"  # Keep the search context alive for 1 minute
)

scroll_id = response["_scroll_id"]

data = []
while True:
    # Fetch the next batch of documents
    response = es.scroll(scroll_id=scroll_id, scroll="1m")
    
    hits = response["hits"]["hits"]
    if not hits:
        break  # Exit when no more documents are found
    
    for hit in hits:
        data.append(hit['_source'])

    scroll_id = response["_scroll_id"]

# Clear the scroll context when done
es.clear_scroll(scroll_id=scroll_id)

edf = pd.DataFrame(data)
edf.describe()

Unnamed: 0,timestamp
count,5817.0
mean,1736012000000.0
std,2001078.0
min,1736008000000.0
25%,1736010000000.0
50%,1736012000000.0
75%,1736013000000.0
max,1736015000000.0


In [87]:
edf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5817 entries, 0 to 5816
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   op          5817 non-null   object
 1   time        5817 non-null   object
 2   hash        5817 non-null   object
 3   relayed_by  5817 non-null   object
 4   event       5817 non-null   object
 5   timestamp   5817 non-null   int64 
dtypes: int64(1), object(5)
memory usage: 272.8+ KB


In [91]:
edf.size

34902

In [92]:
print("transactions max time: ", edf["time"].max())
print("transactions min time: ", edf["time"].min())
min_timestamp = edf["timestamp"].min()
max_timestamp = edf["timestamp"].max()
print("transactions max timestamp: ", max_timestamp)
print("transactions min timestamp: ", min_timestamp)

transactions max time:  2025-01-04 18:26:03
transactions min time:  2025-01-04 16:30:47
transactions max timestamp:  1736015163000
transactions min timestamp:  1736008247000


In [93]:
spark = SparkSession.builder.getOrCreate()


df = spark.read.parquet("hdfs://localhost/crypto/blockchain/transactions")
print("transaction count elastic: ", edf.size)
print("transaction count hdfs: ", df.filter(psf.col("x.time") >= min_timestamp // 1000).filter(psf.col("x.time") <= max_timestamp // 1000).count())

transaction count elastic:  34902


[Stage 41:>                                                       (0 + 16) / 16]

transaction count hdfs:  45935


                                                                                

In [128]:
# calculate training dataset !
training_df = spark.read.parquet("hdfs://localhost/user/hive/warehouse/training_data.db/full_training_dataset")
training_df.count()

35999

In [None]:
training_df.select([
    psf.sum(psf.col(c).isNull().cast("int")).alias(c) for c in training_df.columns
]).show(vertical=True)

-RECORD 0--------------------------------
 price                             | 0   
 open_24h                          | 0   
 volume_24h                        | 0   
 low_24h                           | 0   
 high_24h                          | 0   
 volume_30d                        | 0   
 best_bid                          | 0   
 best_bid_size                     | 0   
 best_ask                          | 0   
 best_ask_size                     | 0   
 avg_price_last_15s                | 0   
 avg_price_last_30s                | 0   
 avg_price_last_60s                | 0   
 transaction_count_last_15s        | 0   
 transaction_count_last_30s        | 0   
 transaction_count_last_60s        | 0   
 coindesk_avg_sentiment            | 0   
 coindesk_std_sentiment            | 0   
 coindesk_max_sentiment            | 0   
 coindesk_min_sentiment            | 0   
 cointelegraph_avg_sentiment       | 0   
 cointelegraph_std_sentiment       | 0   
 cointelegraph_max_sentiment      

25/01/04 23:47:32 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1219)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:295)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java

In [3]:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# start online ml

model = LogisticRegressionModel.load("hdfs://localhost/user/hadoop/model.model/")
model

LogisticRegressionModel: uid=LogisticRegression_a06db5eca165, numClasses=2, numFeatures=28

In [20]:
response = es.search(
    index="predictions",
    query={"match_all": {}},
    size=1000,
)

In [21]:
df = pd.DataFrame([x["_source"] for x in response["hits"]["hits"]])
df

Unnamed: 0,price,open_24h,volume_24h,low_24h,high_24h,volume_30d,best_bid,best_bid_size,best_ask,best_ask_size,...,cointelegraph_std_sentiment,cointelegraph_max_sentiment,cointelegraph_min_sentiment,cryptocurrency_news_avg_sentiment,cryptocurrency_news_std_sentiment,cryptocurrency_news_max_sentiment,cryptocurrency_news_min_sentiment,timestamp,event,prediction
0,97574.10,97853.76,2257.644636,97516.65,98761.02,343263.294827,97574.09,0.025027,97574.10,0.194815,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079018393,ticker,0.0
1,97574.10,97853.76,2257.644638,97516.65,98761.02,343263.294828,97574.09,0.025027,97574.10,0.251814,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079018629,ticker,0.0
2,97574.10,97853.76,2257.644643,97516.65,98761.02,343263.294833,97574.09,0.025027,97574.10,0.251809,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079019682,ticker,0.0
3,97574.10,97853.76,2257.644643,97516.65,98761.02,343263.294834,97574.09,0.025027,97574.10,0.251808,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079019833,ticker,0.0
4,97574.10,97853.76,2257.644694,97516.65,98761.02,343263.294885,97574.09,0.025027,97574.10,0.251757,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079019961,ticker,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,97703.74,97853.76,2263.656472,97516.65,98761.02,343269.306663,97703.73,0.451321,97703.74,0.011594,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079362947,ticker,1.0
996,97703.74,97853.76,2263.668066,97516.65,98761.02,343269.318256,97703.73,0.451721,97704.54,0.004880,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079363178,ticker,1.0
997,97708.40,97853.76,2263.672960,97516.65,98761.02,343269.323150,97708.39,0.004995,97708.40,0.000220,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079363343,ticker,1.0
998,97708.39,97853.76,2263.672951,97516.65,98761.02,343269.323141,97708.39,0.004995,97708.40,0.000229,...,0.347298,0.3182,-0.7125,0.0,0.0,0.0,0.0,1736079363317,ticker,1.0


In [22]:
df["prediction"].max()

np.float64(1.0)

In [23]:
df["prediction"].min()

np.float64(0.0)