## Download file and load file to dhfs

### download

In [22]:
import requests
import json
import pandas as pd
import urllib.request
import urllib.error
import os
import logging
import sys
import datetime

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s %(levelname)s %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('download.log'),
    ]
)
    
def download_file(df, links, name, downloadHistorical):
    link = links[name]['link']
    field = links[name]['column']
    
    if not downloadHistorical: # download uploaded file today. It have data of yesterday
        # check if dont have new data
        yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
        date_string = yesterday.strftime("%d %b %Y")
        if df['Date'][0] != date_string: # 0 is latest index of uploaded data
            logging.info(f"No new data today, at {datetime.datetime.now()}")
            return "error"

        # if have new data
        filename = df[field][0]
        try:
            with urllib.request.urlopen(link.format(df['key'][0]), timeout=10) as response, open(filename, 'wb') as out_file:
                data = response.read()
                out_file.write(data)
                logging.info(f'Download file today: {filename} successfully')
                return filename
        except urllib.error.URLError as e:
            logging.error(f'ConnectionError. Error occurred while downloading {filename}, type = {name}, id = {df["key"][0]}')
        except Exception as e:
            logging.error(f'TimeoutError. Error occurred while downloading {filename}, type = {name}, id = {df["key"][0]}')
            
    else: # download historical files, (files not on today)
        beginId = 1
        yesterday = datetime.datetime.now() - datetime.timedelta(days=2)
        date_string = yesterday.strftime("%d %b %Y")
        if df['Date'][0] != date_string:
            beginId = 0 #if latest file is not on today, download from id = 0
            
        cwd = os.getcwd()
        for id in range(beginId, len(df)):
            filename = df[field][id]
            filepath = os.path.join(cwd, filename)
            if os.path.exists(filepath): # skip exists file
                logging.info(f'File exists. Skipping download of {filename}')
                continue
            else:
                try:
                    with urllib.request.urlopen(link.format(df['key'][id]), timeout=10) as response, open(filename, 'wb') as out_file:
                        data = response.read()
                        out_file.write(data)
                        logging.info(f'Download file: {filename} successfully')
                except urllib.error.URLError as e:
                    logging.error(f'ConnectionError. Error occurred while downloading {filename}, type = {name}, id = {df["key"][id]}')
                    continue
                except Exception as e:
                    logging.error(f'TimeoutError. Error occurred while downloading {filename}, type = {name}, id = {df["key"][id]}')
                    continue
        
'''
this function send api get list files recent and related information as id, uploaded date, fileName
Using json_normalize to convert from json type to dataframe
'''
def getDataFiles():
    url = "https://api3.sgx.com/infofeed/Apps?A=COW_Tickdownload_Content&B=TimeSalesData&C_T=20&noCache=1689701183686"
    try:
        response = requests.request("GET", url)
    except requests.ConnectionError as e:
        pass
        # logging.error('ConnectionError. Error occurred while read api getDataFile')
        # sys.exit(1)
    data = json.loads(response.text)
    df = pd.json_normalize(data['items'])
    df = df.dropna(how='any')
    return df

In [23]:
df = getDataFiles()

downloadHistorical = False

with open('link.json', 'r') as f:
    links = json.load(f)
    
file_name = download_file(df, links, "Tick", downloadHistorical)

2023-08-09 23:48:55,131 DEBUG Starting new HTTPS connection (1): api3.sgx.com:443
2023-08-09 23:48:55,971 DEBUG https://api3.sgx.com:443 "GET /infofeed/Apps?A=COW_Tickdownload_Content&B=TimeSalesData&C_T=20&noCache=1689701183686 HTTP/1.1" 200 385
2023-08-09 23:49:00,977 INFO Download file today: WEBPXTICK_DT-20230808.zip successfully


In [24]:
print(file_name)

WEBPXTICK_DT-20230808.zip


In [25]:
import zipfile
with zipfile.ZipFile(file_name, 'r') as zip_ref:
    zip_ref.extractall()

In [26]:
os.remove(file_name)

In [27]:
file_name = file_name[:-3] + "csv"
print(file_name)

WEBPXTICK_DT-20230808.csv


### upload file to hdfs

In [14]:
import os 

import pandas as pd

from subprocess import PIPE, Popen

# create path to your username on hdfs
hdfs_path = os.path.join(os.sep, 'sgx_data/')
print(hdfs_path)

/sgx_data/


In [30]:
import subprocess

command = 'echo "100600" | sudo -S docker exec -t hdfs_docker hdfs dfs -mkdir /sgx_data/'

# Execute the command in the terminal
subprocess.run(command, shell=True)

[sudo] password for chinhnv: 

CompletedProcess(args='echo "100600" | sudo -S docker exec -t hdfs_docker hdfs dfs -mkdir /sgx_data/', returncode=0)

In [36]:
sourcePath = '/data/'+file_name
print(sourcePath)

/data/WEBPXTICK_DT-20230808.csv


In [40]:
putCommand = f'echo "100600" | sudo -S docker exec -t hdfs_docker hdfs dfs -put /data/{file_name} /sgx_data/'
subprocess.run(putCommand, shell=True)

[sudo] password for chinhnv: 

CompletedProcess(args='echo "100600" | sudo -S docker exec -t hdfs_docker hdfs dfs -put /data/WEBPXTICK_DT-20230808.csv /sgx_data/', returncode=0)

In [41]:
os.remove(file_name)

### read file hdfs

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
            .appName("loadData") \
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
            .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/08/12 23:26:24 WARN Utils: Your hostname, DESKTOP-3QDHGK7 resolves to a loopback address: 127.0.1.1; using 172.23.218.23 instead (on interface eth0)
23/08/12 23:26:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/chinhnv/.ivy2/cache
The jars for the packages stored in: /home/chinhnv/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-55987a39-a18f-4df2-8d07-ba72abda1beb;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 252ms :: artifacts dl 14ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifac

In [2]:
file_name = 'WEBPXTICK_DT-20230810.csv'

In [4]:

df = spark.read.csv(f"hdfs://localhost:8020/sgx_data/{file_name}", header=True, inferSchema=True)

                                                                                

In [5]:
df.count()

                                                                                

3590186

In [None]:
df.head(10)

In [None]:
df.columns

In [None]:
df = df.select('Price', 'Volume')

In [None]:
from pyspark.sql import functions as F
df = df.withColumn('idTransaction', F.monotonically_increasing_id())

In [None]:
df.head(10)

### Spark write to clickhouse

In [None]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")

In [None]:
print(uri)

In [None]:

factChiDinhDv = {
            'url': uri,
            'dbtable' : 'DerivatiesTrading',
            'isolationLevel' : 'NONE'
        }
df.write.mode("append").format("jdbc").options(**factChiDinhDv).save()

# Test

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
            .appName("loadData") \
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
            .getOrCreate()

In [None]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")
factChiDinhDv = {
            'url': uri,
            'dbtable' : 'DerivatiesTrading',
            'isolationLevel' : 'NONE'
        }

In [None]:
df.write.mode("append").format("jdbc").options(**factChiDinhDv).save()

In [None]:
df = spark.read.format("jdbc").options(**factChiDinhDv).load()

In [None]:
df.printSchema()

### INSERT DIM

In [None]:
import clickhouse_connect

client = clickhouse_connect.get_client(host='localhost', port=8123, username='', password='')

In [None]:
#dim action
row1 = ['A', "Ask"]
row2 = ['B', "Bid"]
row3 = ['T', "Traded"]
data = [row1, row2, row3]
client.insert('SgxTrading.Dim_Action', data, column_names=['Action_Key', 'Action'])

<clickhouse_connect.driver.summary.QuerySummary at 0x7fc25a449660>

In [None]:
#dim contract type
row1 = ['F', "Futures"]
row2 = ['P', "Put"]
row3 = ['C', "Call"]
data = [row1, row2, row3]
client.insert('SgxTrading.Dim_ContractType', data, column_names=['ContractType_Key', 'ContractType'])

<clickhouse_connect.driver.summary.QuerySummary at 0x7fc25a4496c0>

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
            .appName("loadData") \
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
            .getOrCreate()
file_name = 'WEBPXTICK_DT-20230803.csv'
df = spark.read.csv(f"hdfs://localhost:8020/sgx_data/{file_name}", header=True, inferSchema=True)


                                                                                

In [None]:
df2 = df.select('Comm').drop_duplicates()

In [None]:
df2.show()



+-----+
| Comm|
+-----+
|  AMF|
|MCNWO|
|   NS|
|MCNJP|
|   NK|
|   EE|
|  MKP|
|   CN|
|  RBF|
|   TU|
|   UC|
|  IDR|
|MCNAX|
|  MUC|
|  TWD|
|   AU|
|  INX|
|  SND|
|   NC|
|  SRT|
+-----+
only showing top 20 rows



                                                                                

In [None]:
df2.count()

                                                                                

103

In [14]:
from pyspark.sql import functions as F
df2 = df2.withColumn("Commodity_Key", F.monotonically_increasing_id())\
        .withColumnRenamed("Comm", "CommodityCode")

In [None]:
df2.show()



+-------------+-------------+
|CommodityCode|Commodity_Key|
+-------------+-------------+
|          AMF|            0|
|        MCNWO|            1|
|           NS|            2|
|        MCNJP|            3|
|           NK|            4|
|           EE|            5|
|          MKP|            6|
|           CN|            7|
|          RBF|            8|
|           TU|            9|
|           UC|           10|
|          IDR|           11|
|        MCNAX|           12|
|          MUC|           13|
|          TWD|           14|
|           AU|           15|
|          INX|           16|
|          SND|           17|
|           NC|           18|
|          SRT|           19|
+-------------+-------------+
only showing top 20 rows



                                                                                

In [9]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")
dimCommodity = {
            'url': uri,
            'dbtable' : 'Dim_Commodity',
            'isolationLevel' : 'NONE'
        }
# df2.write.mode("append").format("jdbc").options(**dimCommodity).save()

In [None]:
import pandas as pd

dates = pd.date_range(start='2020-01-01', end='2030-12-31', freq='D')

dateDf = pd.DataFrame({'Date_key': dates.strftime('%Y%m%d'),
                   'FullDate': dates.strftime('%Y-%m-%d'),
                   'DayOfWeek': dates.strftime('%a'),
                   'CalendarQuarter': dates.quarter.map({1: 'Q1', 2: 'Q2', 3: 'Q3', 4: 'Q4'})})

print(dateDf.head())

   Date_key    FullDate DayOfWeek CalendarQuarter
0  20200101  2020-01-01       Wed              Q1
1  20200102  2020-01-02       Thu              Q1
2  20200103  2020-01-03       Fri              Q1
3  20200104  2020-01-04       Sat              Q1
4  20200105  2020-01-05       Sun              Q1


In [None]:
spark_DateDf = spark.createDataFrame(dateDf)

In [None]:
spark_DateDf.show()

+--------+----------+---------+---------------+
|Date_key|  FullDate|DayOfWeek|CalendarQuarter|
+--------+----------+---------+---------------+
|20200101|2020-01-01|      Wed|             Q1|
|20200102|2020-01-02|      Thu|             Q1|
|20200103|2020-01-03|      Fri|             Q1|
|20200104|2020-01-04|      Sat|             Q1|
|20200105|2020-01-05|      Sun|             Q1|
|20200106|2020-01-06|      Mon|             Q1|
|20200107|2020-01-07|      Tue|             Q1|
|20200108|2020-01-08|      Wed|             Q1|
|20200109|2020-01-09|      Thu|             Q1|
|20200110|2020-01-10|      Fri|             Q1|
|20200111|2020-01-11|      Sat|             Q1|
|20200112|2020-01-12|      Sun|             Q1|
|20200113|2020-01-13|      Mon|             Q1|
|20200114|2020-01-14|      Tue|             Q1|
|20200115|2020-01-15|      Wed|             Q1|
|20200116|2020-01-16|      Thu|             Q1|
|20200117|2020-01-17|      Fri|             Q1|
|20200118|2020-01-18|      Sat|         

In [None]:
dimDate = {
            'url': uri,
            'dbtable' : 'Dim_Date',
            'isolationLevel' : 'NONE'
        }
spark_DateDf.write.mode("append").format("jdbc").options(**dimDate).save()

In [None]:
#dim month
MonthCode = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"]
MonthName = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
id = [i for i in range(12)]
Month = [i for i in range(1, 13)]

data = list(zip(MonthCode, MonthName, id, Month))

MonthDf = pd.DataFrame(data, columns=['MonthCode', 'MonthName', 'Month_Key', "Month"])

spark_MonthDf = spark.createDataFrame(MonthDf)

In [None]:
dimMonth = {
            'url': uri,
            'dbtable' : 'Dim_Month',
            'isolationLevel' : 'NONE'
        }
spark_MonthDf.write.mode("append").format("jdbc").options(**dimMonth).save()

In [None]:
times = pd.date_range(start='00:00:00', end='23:59:59', freq='s').time

timeDf = pd.DataFrame({'Time_Key': [int(time.strftime('%H%M%S')) for time in times],  # Chuyển đổi thành dạng int (hhmmss)
                   'FullTime': [time.strftime('%H:%M:%S') for time in times],  # Chuyển đổi thành dạng hh:mm:ss
                   'Hour': [time.hour for time in times],  # Lấy giờ
                   'Minute': [time.minute for time in times],  # Lấy phút
                   'Second': [time.second for time in times]})  # Lấy giây

spark_TimeDf = spark.createDataFrame(timeDf)

In [None]:
dimTime = {
            'url': uri,
            'dbtable' : 'Dim_Time',
            'isolationLevel' : 'NONE'
        }
spark_TimeDf.write.mode("append").format("jdbc").options(**dimTime).save()

23/08/08 02:13:59 WARN TaskSetManager: Stage 38 contains a task of very large size (2069 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [None]:
years = range(2000, 2051)

yearDf = pd.DataFrame({'Year_Key': range(len(years)),
                   'Year': years})

spark_YearDf = spark.createDataFrame(yearDf)

In [None]:
dimYear = {
            'url': uri,
            'dbtable' : 'Dim_Year',
            'isolationLevel' : 'NONE'
        }
spark_YearDf.write.mode("append").format("jdbc").options(**dimYear).save()

# load fact

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
            .appName("loadData") \
            .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
            .getOrCreate()
file_name = 'WEBPXTICK_DT-20230803.csv'
df = spark.read.csv(f"hdfs://localhost:8020/sgx_data/{file_name}", header=True, inferSchema=True)


your 131072x1 screen size is bogus. expect trouble
23/08/08 16:32:10 WARN Utils: Your hostname, DESKTOP-3QDHGK7 resolves to a loopback address: 127.0.1.1; using 172.23.218.23 instead (on interface eth0)
23/08/08 16:32:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/chinhnv/.ivy2/cache
The jars for the packages stored in: /home/chinhnv/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d55e431f-b8b4-44bb-bf0c-6064749dbe43;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 331ms :: artifacts dl 16ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifac

In [6]:
newComm = df.select('Comm').drop_duplicates().withColumnRenamed("Comm", "CommodityCode")

In [7]:
newComm.show()



+-------------+
|CommodityCode|
+-------------+
|          AMF|
|        MCNWO|
|           NS|
|        MCNJP|
|           NK|
|           EE|
|          MKP|
|           CN|
|          RBF|
|           TU|
|           UC|
|          IDR|
|        MCNAX|
|          MUC|
|          TWD|
|           AU|
|          INX|
|          SND|
|           NC|
|          SRT|
+-------------+
only showing top 20 rows



                                                                                

In [10]:
dimComm = {
            'url': uri,
            'dbtable' : 'Dim_Commodity',
            'isolationLevel' : 'NONE'
        }
comm = spark.read.format("jdbc").options(**dimComm).load()

In [11]:
comm = comm.select("CommodityCode")

In [12]:
comm.show()

+-------------+
|CommodityCode|
+-------------+
|          AMF|
|        MCNWO|
|           NS|
|        MCNJP|
|           NK|
|           EE|
|          MKP|
|           CN|
|          RBF|
|           TU|
|           UC|
|          IDR|
|        MCNAX|
|          MUC|
|          TWD|
|           AU|
|          INX|
|          SND|
|           NC|
|          SRT|
+-------------+
only showing top 20 rows



In [13]:
comm_combined = comm.union(newComm).dropDuplicates()
newRecord = comm_combined.subtract(comm)
# Hiển thị kết quả
newRecord.show()



+-------------+
|CommodityCode|
+-------------+
+-------------+



                                                                                

In [38]:
newRecord = spark.createDataFrame(data=[("Chinh",), ("zxz",)], schema=['CommodityCode'])

In [39]:
newRecord.show()

+-------------+
|CommodityCode|
+-------------+
|        Chinh|
|          zxz|
+-------------+



In [40]:
# TODO: add check empty
newData = [(comm.count() + i, newRecord.collect()[i]['CommodityCode']) for i in range(newRecord.count())]
newDataDf = spark.createDataFrame(data=newData, schema=["Commodity_Key", 'CommodityCode'])

In [41]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")
dimCommodity = {
            'url': uri,
            'dbtable' : 'Dim_Commodity',
            'isolationLevel' : 'NONE'
        }
newDataDf.write.mode("append").format("jdbc").options(**dimCommodity).save()

### JOIN fact

In [46]:
transaction = df.select(['Comm', 'Contract_Type', 'Mth_Code', 'Year', 'Strike', 'Trade_Date', 'Log_Time', 'Price', 'Msg_Code', 'Volume'])

In [62]:
transaction.show()

+----+-------------+--------+----+------+----------+--------+------+--------+------+------+
|Comm|Contract_Type|Mth_Code|Year|Strike|Trade_Date|Log_Time| Price|Msg_Code|Volume|YearId|
+----+-------------+--------+----+------+----------+--------+------+--------+------+------+
|AJRT|            F|       Q|2023|   0.0|  20230803|   92905|2090.5|       A|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   92905|2085.5|       B|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   92935|2091.0|       A|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   92935|2086.0|       B|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   93011|2091.5|       A|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   93011|2086.5|       B|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   93014|2091.0|       A|    10|    23|
|AJRT|            F|       Q|2023|   0.0|  20230803|   93014|2086.0|       B|   

In [74]:
Comm = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Commodity'}).load()
transaction = transaction.join(Comm, transaction.Comm == Comm.CommodityCode, 'left').select(df["*"], Comm['Commodity_Key'].alias("CommodityId"))

In [75]:
Month = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Month'}).load()
transaction = transaction.join(Month, transaction.Mth_Code == Month.MonthCode, 'left').select(transaction["*"], Month['Month_Key'].alias("MonthId"))

In [76]:
Year = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Year'}).load()
transaction = transaction.join(Year, transaction.Year == Year.Year, 'left').select(transaction["*"], Year['Year_Key'].alias("YearId"))

In [77]:
Date = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Date'}).load()
transaction = transaction.join(Date, transaction.Trade_Date == Date.Date_Key, 'left').select(transaction["*"], Date['Date_Key'].alias("DateId"))

In [78]:
Time = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Time'}).load()
transaction = transaction.join(Time, transaction.Log_Time == Time.Time_Key, 'left').select(transaction["*"], Time['Time_Key'].alias("LogTimeId"))

In [79]:
Action = spark.read.format("jdbc").options(**{'url': uri, 'dbtable' : 'Dim_Action'}).load()
transaction = transaction.join(Action, transaction.Msg_Code == Action.Action_Key, 'left').select(transaction["*"], Action['Action_Key'].alias("ActionId"))

In [80]:
transaction.columns

['Comm',
 'Contract_Type',
 'Mth_Code',
 'Year',
 'Strike',
 'Trade_Date',
 'Log_Time',
 'Price',
 'Msg_Code',
 'Volume',
 'CommodityId',
 'MonthId',
 'YearId',
 'DateId',
 'LogTimeId',
 'ActionId']

In [82]:
transaction = transaction.withColumnRenamed("Contract_Type", 'ContractTypeId')

In [None]:
transaction = transaction.withColumnRenamed("Strike", 'StrikePrice')

In [84]:
lastDf = transaction.select(['CommodityId', 'MonthId', 'YearId', 'DateId', 'LogTimeId', 'ActionId', 'ContractTypeId', 'StrikePrice', 'Volume'])

In [85]:
lastDf = lastDf.withColumn('IdTransaction', F.monotonically_increasing_id())

In [86]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")
factTrading = {
            'url': uri,
            'dbtable' : 'Fact_DerivatiesTrading',
            'isolationLevel' : 'NONE'
        }
lastDf.write.mode("append").format("jdbc").options(**factTrading).save()

                                                                                

- Airflow,
- Clickhouse
- Hdfs
- Spark

In [6]:
from airflow.models.baseoperator import BaseOperator
from airflow.models import Variable
from pyspark.sql import SparkSession
import clickhouse_connect
from pyspark.sql import functions as F
import pandas as pd

spark = SparkSession.builder.master("local[1]") \
            .appName("loadData") \
            .getOrCreate()

[[34m2023-08-13T01:06:11.713+0700[0m] {[34mctypes.py:[0m22} INFO[0m - Successfully imported ClickHouse Connect C data optimizations[0m
[[34m2023-08-13T01:06:11.724+0700[0m] {[34mjson_impl.py:[0m45} INFO[0m - Using python library for writing JSON byte strings[0m


In [7]:
fileName='WEBPXTICK_DT-20230803.csv'
df = spark.read.csv(f"hdfs://localhost:8020/sgx_data/{fileName}", header=True, inferSchema=True)
print(f"Done read file {fileName}, with {df.count()} record")



Done read file WEBPXTICK_DT-20230803.csv, with 4153692 record


                                                                                

In [8]:
uri = "jdbc:clickhouse://{}:{}/{}".format("localhost", 8123, "SgxTrading")
dimAction = spark.read.format("jdbc").options(**{'url': uri,'dbtable' : 'Dim_Action','isolationLevel' : 'NONE'}).load()

In [9]:
dimAction.count()

0