# Clean Final Datastet

In [None]:
import numpy as np
import pandas as pd
import pyspark
import sys
import databricks.koalas as ks
import time

In [None]:
# Check spark app name
spark.sparkContext.appName

'PySparkShell'

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

In [None]:
# print runtime versions
# Python version
sys.version

'3.8.10 (default, Jun  2 2021, 10:49:15) \n[GCC 9.4.0]'

In [None]:
# Spark version
spark.version

'3.1.2'

In [None]:
pd.set_option("mode.use_inf_as_na", True)
ks.set_option('compute.ops_on_diff_frames', True)

### Start

In [None]:
starttotal = time.time()

In [None]:
df = ks.read_csv('data/long_dataframe.csv')

In [None]:
names = list(df['name'].unique().sort_values().to_numpy())

In [None]:
names.remove('TWD/USD')
del names[-1]
names.append('TWD/USD')

In [None]:
# for name in names:
#     leng = df[df['name'] == name].shape[0]
#     print(f'{leng} is the lenght of {name}')

In [None]:
start = time.time()

# 先建立一個之後要對照的 Dataframe
df1 = df[df['name']=='1301.TW']

print(df1.shape, ' Original shape')

# 丟掉全是空值的 colown
allnanlist = list(df1.columns[np.where((df1.isnull().sum() == 2524).to_numpy())])
df1 = df1.drop(allnanlist, axis=1)

print(df1.shape, ' Later shape')

# 因 TAlib 計算式的關係，前面row會有空值(像是20日均線，前20天沒資料)，故在確認前 100 row 空值最大 row 數是多少
baseline = df1.iloc[:101,:].isnull().sum().max()
# 取 baseline 以後當新的 df # index reset
df1= df1.iloc[baseline:,:].reset_index(drop=True)

print(df1.shape, ' Final shape')

# 確認沒有 空值
print(df1.isnull().any().sum(),' Number of Nan')
# 確認沒有 無限值
print(df1.isna().any().sum(),' Number of Infinity')

# # 標準 columns
# standCols = df1.columns
# # 標準 Date
# standDate = df1['date']

end = time.time()
print(end - start)

(2524, 201)  Original shape
(2524, 192)  Later shape
(2436, 192)  Final shape
0  Number of Nan
0  Number of Infinity
45.541550636291504


In [None]:
# 丟掉全是空值的 colown, 也把 df 裡面的刪掉
df2 = df.drop(allnanlist, axis=1)

In [None]:
start = time.time()

for name in names[1:-2]:
    
    # 將一份股票讀入
    dft = df2[df2['name']== name]
       
    # 取 baseline 以後當新的 df # index reset    
    dft= dft.iloc[baseline:,:].reset_index(drop=True)
    
    
    if name == 'CrudeOil':
        dft = dft.fillna(method='ffill')
    
    # 判斷是否有 空值 與 無限值
    if dft.isnull().any().sum() != 0 or dft.isna().any().sum() != 0:
            
        print(f'{name} still has Nan or Infinity !!!')
        break
    
    # dataframe concat 合併
    else:
            
        df1 = ks.concat([df1, dft], ignore_index=True)
        print(f'{name} are in the clean long dataset !!!')
        
print('-'*100)
print('Finish !!!')

end = time.time()
print(end - start)

1303.TW are in the clean long dataset !!!
1304.TW are in the clean long dataset !!!
1305.TW are in the clean long dataset !!!
1308.TW are in the clean long dataset !!!
1309.TW are in the clean long dataset !!!
1312.TW are in the clean long dataset !!!
1313.TW are in the clean long dataset !!!
1314.TW are in the clean long dataset !!!
1326.TW are in the clean long dataset !!!
2002.TW are in the clean long dataset !!!
2006.TW are in the clean long dataset !!!
2010.TW are in the clean long dataset !!!
2014.TW are in the clean long dataset !!!
2023.TW are in the clean long dataset !!!
2027.TW are in the clean long dataset !!!
2028.TW are in the clean long dataset !!!
2030.TW are in the clean long dataset !!!
2031.TW are in the clean long dataset !!!
2038.TW are in the clean long dataset !!!
2603.TW are in the clean long dataset !!!
2605.TW are in the clean long dataset !!!
2606.TW are in the clean long dataset !!!
2609.TW are in the clean long dataset !!!
2615.TW are in the clean long data

In [None]:
# df1.shape -> (93027, 186)

In [None]:
# df1 = ks.read_parquet('data/tem.parquet')

In [None]:
# allnanlist[:2] #　['acos', 'asin']

In [None]:
# 把 df 裡面全是空值的 ['acos', 'asin']刪掉
df3 = df.drop(allnanlist[:2], axis=1)

In [None]:
start = time.time()

# 航運指數取出
shipdf = df3[df3['name'] == 'shippingIndex']

# 取 baseline row 以後的值
shipdf = shipdf.iloc[baseline:,:]

# dataframe concat 合併
df1 = ks.concat([df1, shipdf], ignore_index=True)

end = time.time()
print(end - start)

52.85513162612915


In [None]:
start = time.time()

# 匯率取出
twddf = df3[df3['name'] == 'TWD/USD']

# 取 baseline row 以後的值
twddf = twddf.iloc[baseline:,:]

# dataframe concat 合併
df1 = ks.concat([df1, twddf], ignore_index=True)

end = time.time()
print(end - start)

44.77653646469116


In [None]:
df1.to_spark().coalesce(1).write.option("header", "true").csv("name.csv")

In [None]:
# ndf = ks.read_csv('name.csv')

In [None]:
# ndf.shape

(96759, 199)

In [None]:
# ndf['category'].uniqueque()

0       Steel
1     Futures
2     Plastic
3       Index
4    Shipping
5      ExRate
Name: category, dtype: object

In [None]:
# ndf[ndf['category'] == 'Index'].isnull().sum()

date                       0
category                   0
name                       0
open                    2436
high                    2436
low                     2436
close                   2436
volume                  2436
year                    2436
month                   2436
day                     2436
dayofyear               2436
weekofyear              2436
dayofweek               2436
ht_dcperiod             2436
ht_dcphase              2436
inphase                 2436
quadrature              2436
sine                    2436
leadsine                2436
ht_trendmode            2436
add                     2436
div                     2436
max23                   2436
maxindex                2436
min25                   2436
minindex                2436
min27                   2436
max28                   2436
minidx                  2436
maxidx                  2436
mult                    2436
sub                     2436
sum                     2436
atan          

In [None]:
endtotal = time.time()
print(starttotal-endtotal)

In [None]:
# Reference:

# 尋找缺失值位置
# np.where(np.isnan(dftt['trix'].to_numpy()))

In [None]:
# df2 = df[df['name']=='^TWII']

In [None]:
# # 保留前 100 row 空值小於等於 60% 的 column
# cond = df2.iloc[:101,:].isnull().sum()/100 <= 0.6
# df2 = df2[cond[cond == True].index.to_numpy()]

In [None]:
# # 因 TAlib 計算式的關係，前面row會有空值(像是20日均線，前20天沒資料)，故在確認前 100 row 空值最大 row 數是多少
# baseline = df2.iloc[:101,:].isnull().sum().max()

In [None]:
# # 取 baseline 以後當新的 df # index reset
# df2= df2.iloc[baseline:,:].reset_index(drop=True)

In [None]:
# np.where(df2.isnull().any().to_numpy() == True)