In [None]:
%pip install python-binance pyspark

In [None]:
from binance.client import Client
import configparser
from os import environ
from pyspark import sql
from pyspark.sql.functions import *
import time

# Read config file
config = configparser.ConfigParser()
config.read('config.ini')
api_secret = config['DEFAULT']['bananceSecret']
api_key = config['DEFAULT']['binanceApi']
warehouse = config['DEFAULT']['warehouse']
secret = config['DEFAULT']['secret']
storageAccountName  = config['DEFAULT']['storageAccountName']
hive_uri = config['DEFAULT']['hive_uri']

# create client
client = Client(api_key, api_secret, tld='us')

# add Iceberg dependency
ICEBERG_VERSION="0.12.0"
DEPENDENCIES="org.apache.iceberg:iceberg-spark3-runtime:{}".format(ICEBERG_VERSION)
DEPENDENCIES+=",org.apache.hadoop:hadoop-azure:3.2.0"
DEPENDENCIES+=",com.microsoft.azure:azure-storage:7.0.0" 
DEPENDENCIES+=",org.apache.hadoop:hadoop-azure-datalake:3.2.0"

# set environment dependencies
environ['PYSPARK_SUBMIT_ARGS'] = '--packages {} pyspark-shell'.format(DEPENDENCIES)

# Create spark session with jars
spark = sql.SparkSession.builder \
        .master("local[8]") \
        .config('spark.jars.packages', DEPENDENCIES) \
        .getOrCreate() 
        
        

# Set iceberg settings
spark.conf.set("fs.azure.account.key." +  storageAccountName + ".blob.core.windows.net", secret)
spark.conf.set("spark.sql.catalog.spark_catalog.warehouse", warehouse)
spark.conf.set("spark.sql.catalog.spark_catalog.type", "hive")
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark.conf.set("spark.sql.catalog.spark_catalog.uri", hive_uri)

In [None]:

# get current prices from binance
tickers = client.get_all_tickers()

#convert to a spark dataframe, cast data type to decimal, and add date and timestamp columns
tickersDF = spark.createDataFrame(data=tickers)
tickersDF = tickersDF.withColumn("price", col("price").cast("decimal(38,8)"))
tickersDF = tickersDF.withColumn("date", current_date()).withColumn("ts", current_timestamp())


#write to an iceberg table
tickersDF.writeTo('tickers').using('iceberg').partitionedBy('date').createOrReplace()

In [None]:
spark.table("tickers").show()

In [None]:
# create funtion to append data
def get_tickers():
    # get current prices from binance
    tickers = client.get_all_tickers()
    
    #convert to a spark dataframe & add date and timestamp columns
    tickersDF = spark.createDataFrame(data=tickers)
    tickersDF = tickersDF.withColumn("price", col("price").cast("decimal(38,8)"))
    tickersDF = tickersDF.withColumn("date", current_date()).withColumn("ts", current_timestamp())
    
    
    #write to an iceberg table
    tickersDF.writeTo('tickers').append()

In [None]:
# Function to repeat a task
def load_tickers(interval):
    while True:
        get_tickers()
        time.sleep(interval)

In [None]:
# call periodic work every 10 seconds
load_tickers(10)

# Writing with DataFrames
[apache Iceberg](https://iceberg.apache.org/spark-writes/#writing-with-dataframes)
<ul>
    <li>df.writeTo(t).create() is equivalent to CREATE TABLE AS SELECT</li>
    <li>df.writeTo(t).replace() is equivalent to REPLACE TABLE AS SELECT</li>
    <li>df.writeTo(t).append() is equivalent to INSERT INTO</li>
    <li>df.writeTo(t).overwritePartitions() is equivalent to dynamic INSERT OVERWRITE</li>
</ul>
