In [27]:
import pyspark
from pyspark.sql import SparkSession
# Catalog name
catalog_name = "iceberg"
# Database name
db_name = "dev"
# Table name
table_name = "forex_minute_aggs"
# Env vars
s3_user = "jared"
s3_password = "password"
# Schema
from pyspark.sql import Row
from pyspark.sql.types import *
forex_schema = StructType(
    [
        StructField("ticker", StringType(), True),
        StructField("volume", LongType(), True),
        StructField("open", DoubleType(), True),
        StructField("close", DoubleType(), True),
        StructField("high", DoubleType(), True),
        StructField("low", DoubleType(), True),
        StructField("window_start_ns", LongType(), True),
        StructField("transactions", LongType(), True),
        StructField("date_time", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("month", IntegerType(), True),
        StructField("day", IntegerType(), True),
    ]
)
# Source data location
source_loc = "s3a://polygon/unzipped/global_forex/minute_aggs_v1/"

In [28]:
# Initialize Spark Session with Hive support
spark = SparkSession.builder \
    .appName("SparkHiveMinIO") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.metastore.warehouse.dir", "s3a://hive") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", s3_user) \
    .config("spark.hadoop.fs.s3a.secret.key", s3_password) \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.603,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1") \
    .config("spark.sql.parquet.enableVectorizedReader", "true") \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.type", "hive") \
    .config(f"spark.sql.catalog.{catalog_name}.uri", "thrift://hive-metastore:9083") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", "s3a://iceberg/") \
    .config(f"spark.sql.catalog.{catalog_name}.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config(f"spark.sql.catalog.{catalog_name}.fs.s3a.access.key", s3_user) \
    .config(f"spark.sql.catalog.{catalog_name}.fs.s3a.secret.key", s3_password) \
    .getOrCreate()

# Verify connection to Spark/Hive
spark.sql("SHOW DATABASES").show()
sc = spark.sparkContext
sc.setLogLevel("DEBUG")

+---------+
|namespace|
+---------+
|  default|
+---------+



In [29]:
# Read all files from location
print("Reading files from object storage...")
parquet_df = spark.read.schema(forex_schema).parquet(source_loc)
print("Dataframe created.")

Reading files from object storage...
Dataframe created.


In [30]:
# Create tables
spark.sql(
    f"""
    CREATE DATABASE IF NOT EXISTS {catalog_name}.{db_name}
    """
)
spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {catalog_name}.{db_name}.{table_name} (
        ticker string,
        volume integer,
        open double,
        close double,
        high double,
        low double,
        window_start_ns long,
        transactions long,
        date_time string,
        year integer,
        month integer,
        day integer
    )
    USING iceberg
    PARTITIONED BY (ticker);
    """
)

DataFrame[]

In [None]:
parquet_df.cache()

In [31]:
# Write files to Iceberg table
parquet_df.write.format("iceberg").mode("overwrite").save(f"{catalog_name}.{db_name}.{table_name}")

In [24]:
#spark.sql(
#    f"""
#    DROP TABLE IF EXISTS {catalog_name}.{db_name}.{table_name};
#    """
#)
#spark.sql(
#    f"""
#    DROP DATABASE IF EXISTS {catalog_name}.{db_name} CASCADE;
#    """
#)

DataFrame[]