# 1.Loading Collected Raw Data

- In Databricks, the type of tables can be saved as 'managed tables' or 'external tables', in my case, I saved my raw data in external GCS. 

In [0]:
%fs ls "dbfs:/mnt/demo"

In [0]:
%sql 
select * from parquet.`dbfs:/mnt/demo/BTC-USD/` 
order by `('Date', '')`
limit 10 

# 2.Data Cleaning 

## 2.1 Cleaning BTC data  

In [0]:
%sql 
-- rename column, then save as view
create view view_btc_v1 as 
select `('Date', '')` as date, `('Close', 'BTC-USD')` as btc_close, 
  `('High', 'BTC-USD')` as btc_high, `('Low', 'BTC-USD')` as btc_low,
  `('Open', 'BTC-USD')` as btc_open, `('Volume', 'BTC-USD')` as btc_volume
 from parquet.`dbfs:/mnt/demo/BTC-USD/` 


In [0]:
%sql 
show tables

In [0]:
%sql 
use schema default;
     
select * from view_btc_v1
limit 10 

In [0]:
%sql 
-- check duplication 
select date, count(*) as dup_count
from view_btc_v1
group by date
having count(*) > 1


In [0]:
%sql
describe extended view_btc_v1

In [0]:
%sql
-- check null 
select count(*) as total_rows, 
       sum(case when date is null then 1 else 0 end) as null_date,
       sum(case when btc_close is null then 1 else 0 end) as null_btc_close,
       sum(case when btc_high is null then 1 else 0 end) as null_btc_high,
       sum(case when btc_low is null then 1 else 0 end) as null_btc_low, 
       sum(case when btc_open is null then 1 else 0 end) as null_btc_open,
       sum(case when btc_volume is null then 1 else 0 end) as null_btc_volume
from view_btc_v1

In [0]:
%sql
-- extracting date and create cleaned btc table
create or replace table cleaned_btc as 
select Date(date) as date, btc_open, btc_high, btc_low, btc_close, btc_volume
from view_btc_v1
order by date

## 2.2 Cleaning ETH data 


In [0]:
%sql 
-- rename column, then save as view
create view view_eth_v1 as 
select `('Date', '')` as date, `('Close', 'ETH-USD')` as eth_close, 
  `('High', 'ETH-USD')` as eth_high, `('Low', 'ETH-USD')` as eth_low,
  `('Open', 'ETH-USD')` as eth_open, `('Volume', 'ETH-USD')` as eth_volume
 from parquet.`dbfs:/mnt/demo/ETH-USD/` 

In [0]:
%sql 
use schema default;
     
select * from view_eth_v1
limit 10 

In [0]:
%sql 
-- check duplication 
select date, count(*) as dup_count
from view_eth_v1
group by date
having count(*) > 1


In [0]:
%sql
-- check null 
select count(*) as total_rows, 
       sum(case when date is null then 1 else 0 end) as null_date,
       sum(case when eth_close is null then 1 else 0 end) as null_eth_close,
       sum(case when eth_high is null then 1 else 0 end) as null_eth_high,
       sum(case when eth_low is null then 1 else 0 end) as null_eth_low, 
       sum(case when eth_open is null then 1 else 0 end) as null_eth_open,
       sum(case when eth_volume is null then 1 else 0 end) as null_eth_volume
from view_eth_v1

In [0]:
%sql
-- eth has 2776 data points, which is inconsistent with the number of btc row, we found that the start point for eth data is 2017-11-09 on YahooFinance 
select * 
from view_eth_v1
order by date 

In [0]:
%sql
-- extracting date and create cleaned eth table
create or replace table cleaned_eth as 
select Date(date) as date, eth_open, eth_high, eth_low, eth_close, eth_volume
from view_eth_v1
order by date

## 2.3 Cleaning LTC data 

In [0]:
%sql 
-- rename column, then save as view
create view view_ltc_v1 as 
select `('Date', '')` as date, `('Close', 'LTC-USD')` as ltc_close, 
  `('High', 'LTC-USD')` as ltc_high, `('Low', 'LTC-USD')` as ltc_low,
  `('Open', 'LTC-USD')` as ltc_open, `('Volume', 'LTC-USD')` as ltc_volume
 from parquet.`dbfs:/mnt/demo/LTC-USD/`

In [0]:
%sql 
use schema default;
     
select * from view_ltc_v1
limit 10 

In [0]:
%sql 
-- check duplication 
select date, count(*) as dup_count
from view_ltc_v1
group by date
having count(*) > 1

In [0]:
%sql
-- check null 
select count(*) as total_rows, 
       sum(case when date is null then 1 else 0 end) as null_date,
       sum(case when ltc_close is null then 1 else 0 end) as null_ltc_close,
       sum(case when ltc_high is null then 1 else 0 end) as null_ltc_high,
       sum(case when ltc_low is null then 1 else 0 end) as null_ltc_low, 
       sum(case when ltc_open is null then 1 else 0 end) as null_ltc_open,
       sum(case when ltc_volume is null then 1 else 0 end) as null_ltc_volume
from view_ltc_v1

In [0]:
%sql
-- extracting date and create cleaned ltc table
create or replace table cleaned_ltc as 
select Date(date) as date, ltc_open, ltc_high, ltc_low, ltc_close, ltc_volume
from view_ltc_v1
order by date

## 2.4 Cleaning XRP data 


In [0]:
%sql
-- rename column, then save as view

create view view_xrp_v1 as 
select `('Date', '')` as date, `('Close', 'XRP-USD')` as xrp_close, 
  `('High', 'XRP-USD')` as xrp_high, `('Low', 'XRP-USD')` as xrp_low,
  `('Open', 'XRP-USD')` as xrp_open, `('Volume', 'XRP-USD')` as xrp_volume
from parquet.`dbfs:/mnt/demo/XRP-USD/`;

In [0]:
%sql
-- check duplication
select date, count(*) as dup_count
from view_xrp_v1
group by date
having count(*) > 1;

In [0]:
%sql
-- check null 
select count(*) as total_rows, 
       sum(case when date is null then 1 else 0 end) as null_date,
       sum(case when xrp_close is null then 1 else 0 end) as null_xrp_close,
       sum(case when xrp_high is null then 1 else 0 end) as null_xrp_high,
       sum(case when xrp_low is null then 1 else 0 end) as null_xrp_low, 
       sum(case when xrp_open is null then 1 else 0 end) as null_xrp_open,
       sum(case when xrp_volume is null then 1 else 0 end) as null_xrp_volume
from view_xrp_v1;


In [0]:
%sql
-- check data range, start from 2017-11-09, 2776 data points
select * 
from view_xrp_v1
order by date;

In [0]:
%sql
-- extracting date and create cleaned xrp table
create or replace table cleaned_xrp as 
select Date(date) as date, xrp_open, xrp_high, xrp_low, xrp_close, xrp_volume
from view_xrp_v1
order by date;

## 2.5 Cleaning SP500 data 

In [0]:
%sql 
-- rename column, then save as view

create view view_sp500_v1 as 
select `('Date', '')` as date, `('Close', '^GSPC')` as sp_close, 
  `('High', '^GSPC')` as sp_high, `('Low', '^GSPC')` as sp_low,
  `('Open', '^GSPC')` as sp_open, `('Volume', '^GSPC')` as sp_volume
from parquet.`dbfs:/mnt/demo/^GSPC/`;

In [0]:
%sql 
-- check duplication 
select date, count(*) as dup_count
from view_sp500_v1
group by date
having count(*) > 1;

In [0]:
%sql
-- we can see that total_rows is 2124 data points, suspecting less points caused due to market closing 
select count(*) as total_rows, 
    sum(case when date is null then 1 else 0 end) as null_date,
    sum(case when sp_close is null then 1 else 0 end) as null_sp_close,
    sum(case when sp_high is null then 1 else 0 end) as null_sp_high,
    sum(case when sp_low is null then 1 else 0 end) as null_sp_low, 
    sum(case when sp_open is null then 1 else 0 end) as null_sp_open,
    sum(case when sp_volume is null then 1 else 0 end) as null_sp_volume
from view_sp500_v1       


In [0]:
%sql 
-- we can see that there is no data record during the weekend, e.g. 2017-01-07 and 2017-01-08
select * 
from view_sp500_v1
order by date;

In [0]:
%sql
-- extracting date and create cleaned sp500 view
create or replace view cleaned_sp500_v2 as 
select Date(date) as date, sp_open, sp_high, sp_low, sp_close, sp_volume
from view_sp500_v1

In [0]:
%sql
CREATE OR REPLACE VIEW corrected_date AS 
SELECT explode(sequence(to_date('2017-01-01'), to_date('2025-06-15'), interval 1 day)) AS corrected_date;


In [0]:
%sql
select * from corrected_date

In [0]:
%sql 
select c.*, s.* 
from corrected_date c
left join cleaned_sp500_v2 s on c.corrected_date = s.date
limit 20 

In [0]:
%sql 
-- forward fill missing data, ensuring data points are existing for given date range
-- then save as cleaned_sp table
create or replace table cleaned_sp as

SELECT
    c.corrected_date AS date,
    last(s.sp_open, true) OVER (ORDER BY c.corrected_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sp_open,
    last(s.sp_high, true) OVER (ORDER BY c.corrected_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sp_high,
    last(s.sp_low, true) OVER (ORDER BY c.corrected_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sp_low,
    last(s.sp_close, true) OVER (ORDER BY c.corrected_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sp_close,
    last(s.sp_volume, true) OVER (ORDER BY c.corrected_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sp_volume
FROM corrected_date c
LEFT JOIN cleaned_sp500_v2 s
    ON c.corrected_date = s.date
ORDER BY c.corrected_date



## 2.6 Cleaning Hash Rate 

In [0]:
%sql 
-- rename column, then save as view

create view view_hash as 
select `Date` as date, `HashRate` as hashrate
from parquet.`dbfs:/mnt/demo/hashrate/`;

In [0]:
%sql
select * 
from view_hash
limit 10 

In [0]:
%sql 
-- check duplication 
select date, count(*) as dup_count
from view_hash
group by date
having count(*) > 1;

In [0]:
%sql
-- check null values
select count(*) as total_rows, 
    sum(case when date is null then 1 else 0 end) as null_date,
    sum(case when hashrate  is null then 1 else 0 end) as null_hash 
  from view_hash        

In [0]:
%sql
-- extracting date and create cleaned hashrate table
create or replace table cleaned_hashrate as 
select Date(date) as date, hashrate
from view_hash

In [0]:
%sql

create or replace view view_merged as 
select b.date,
       ((b.btc_close - lag(b.btc_close, 1) over (order by b.date)) / lag(b.btc_close, 1) over (order by b.date))*100 as btc_return,  
       b.btc_open, b.btc_high, b.btc_low, b.btc_close, b.btc_volume,
       e.eth_open, e.eth_high, e.eth_low, e.eth_close, e.eth_volume, 
       l.ltc_open, l.ltc_high, l.ltc_low, l.ltc_close, l.ltc_volume,
       x.xrp_open, x.xrp_high, x.xrp_low, x.xrp_close, x.xrp_volume,
       s.sp_open, s.sp_high, s.sp_low, s.sp_close, s.sp_volume,
       h.hashrate  
from cleaned_btc b 
left join cleaned_eth e on b.date = e.date
left join cleaned_ltc l on b.date = l.date
left join cleaned_xrp x on b.date = x.date
left join cleaned_sp s on b.date = s.date
left join cleaned_hashrate h on b.date = h.date




In [0]:
%sql
select * from view_merged
where eth_close is not null 
limit 10 

In [0]:
%sql
select avg(btc_return) as avg, stddev(btc_return) as std
from view_merged
where btc_return is not null

In [0]:
%sql 
create or replace table cleaned_merged as
select *
from (
select *, 
      lead(btc_return, 1) over(order by date) as next_day_btc_return  
from view_merged
) sub 
where next_day_btc_return is not null and eth_close is not null 

In [0]:
%sql 
show tables 

In [0]:
%sql 
select * 
from cleaned_merged
limit 10 

In [0]:
df = spark.table("cryptoworkspace.default.cleaned_merged")
padf = df.toPandas()

In [0]:
padf.head()

In [0]:
padf.describe()

In [0]:
padf['ma30_btc_return'] = padf['next_day_btc_return'].rolling(window=30).mean()

In [0]:
# 2.784457
padf[29:59]['next_day_btc_return'].mean()

In [0]:
padf[29:59]

In [0]:
# next 30 days average return 
padf['next30d_average_return'] = padf['ma30_btc_return'].shift(-29)

In [0]:
padf[29:59]

In [0]:
type(pdf)

In [0]:
padf.describe()

In [0]:
padf[:30]

In [0]:
df = spark.createDataFrame(padf)
df.write.mode("overwrite").saveAsTable("cryptoworkspace.default.semi_cleaned_whole")

In [0]:
padf[:30]['next_day_btc_return'].mean()

In [0]:
padf_xy = padf.drop(columns=['ma30_btc_return', 'next_day_btc_return', 'btc_return'])

In [0]:
padf_xy.head()

In [0]:
padf_cleaned = padf_xy.dropna()

In [0]:
padf_cleaned.describe()

In [0]:
padf_cleaned.tail()

In [0]:
df = spark.createDataFrame(padf_cleaned)
df.write.mode("overwrite").saveAsTable("cryptoworkspace.default.cleaned_xy_aml")

In [0]:
%sql 
drop view corrected_date;
drop view view_hash;
drop view view_merged