In [9]:
spark.stop()

In [1]:
import os
import sys
import findspark
import subprocess
from dotenv import load_dotenv

# Load AWS keys and S3 warehouse location
load_dotenv()

# Set Spark & Java env
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/3.5.4/libexec"
os.environ["PYSPARK_PYTHON"] = "python3"
#os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
java_home = subprocess.getoutput("dirname $(dirname $(which java))")
print(java_home)
os.environ["JAVA_HOME"] = java_home
os.environ["AWS_REGION"] = "us-east-1"
os.environ["aws.region"] = "us-east-1"


# Use Iceberg + AWS Glue with Spark runtime 3.5
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages "
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,"
    "software.amazon.awssdk:glue:2.20.128,"
    "software.amazon.awssdk:s3:2.20.128,"
    "software.amazon.awssdk:sts:2.20.128,"
    "software.amazon.awssdk:kms:2.20.128,"
    "software.amazon.awssdk:dynamodb:2.20.128,"
    "org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell"
)
#os.environ["PYSPARK_SUBMIT_ARGS"] = (
#    "--packages "
#    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,"
#    "org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell"
#)



# Append Spark Python path
sys.path.append("/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/python/lib")

# Initialize Spark via findspark
findspark.init()
print("findspark initialized successfully!")


/opt/homebrew/opt/openjdk@11
findspark initialized successfully!


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergGlueCatalogSetup") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.my_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.my_catalog.warehouse", os.environ.get("s3_location")) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .config("spark.hadoop.fs.s3a.access.key", os.environ.get("AWS_ACCESS_KEY_ID")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .getOrCreate()

print("Spark session started successfully!")


:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.5.4/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/sarthak/.ivy2/cache
The jars for the packages stored in: /Users/sarthak/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
software.amazon.awssdk#glue added as a dependency
software.amazon.awssdk#s3 added as a dependency
software.amazon.awssdk#sts added as a dependency
software.amazon.awssdk#kms added as a dependency
software.amazon.awssdk#dynamodb added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c7e4bb91-6d31-42c8-ba0e-17b9f04c4380;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
	found software.amazon.awssdk#glue;2.20.128 in central
	found software.amazon.awssdk#aws-json-protocol;2.20.128 in central
	found software.amazon.awssdk#aws-core;2.20.128 in central
	found software.amazon.awssdk#annotations;2.20.128 in central
	found software.amazon.awssdk#regions;2.20.128 in central
	found sof

Spark session started successfully!


In [4]:
# find a better way
spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.nonprofit_data_explorer.bmo_table (
    EIN STRING,
    NAME STRING,
    ICO STRING,
    STREET STRING,
    CITY STRING,
    STATE STRING,
    ZIP STRING,
    GROUP STRING,
    SUBSECTION STRING,
    AFFILIATION STRING,
    CLASSIFICATION STRING,
    RULING STRING,
    DEDUCTIBILITY STRING,
    FOUNDATION STRING,
    ACTIVITY STRING,
    ORGANIZATION STRING,
    STATUS STRING,
    TAX_PERIOD STRING,
    ASSET_CD STRING,
    INCOME_CD STRING,
    FILING_REQ_CD STRING,
    PF_FILING_REQ_CD STRING,
    ACCT_PD STRING,
    ASSET_AMT STRING,
    INCOME_AMT STRING,
    REVENUE_AMT STRING,
    NTEE_CD STRING,
    SORT_NAME STRING
)
USING iceberg
PARTITIONED BY (STATE)
""")


25/04/20 17:39:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[]

In [3]:
spark.sql("SHOW TABLES IN my_catalog.nonprofit_data_explorer").show()


+--------------------+---------+-----------+
|           namespace|tableName|isTemporary|
+--------------------+---------+-----------+
|nonprofit_data_ex...|bmo_table|      false|
+--------------------+---------+-----------+



In [4]:
s3_path = "s3a://nonprofit-explorer/raw_data/"

# Load CSVs from S3
df = spark.read.option("header", True).csv(s3_path)

# Show data
df.show()


25/04/20 17:54:02 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/04/20 17:54:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+--------------------+-------------------+--------------------+--------------+-----+----------+-----+----------+-----------+--------------+------+-------------+----------+---------+------------+------+----------+--------+---------+-------------+----------------+-------+---------+----------+-----------+-------+--------------------+
|      EIN|                NAME|                ICO|              STREET|          CITY|STATE|       ZIP|GROUP|SUBSECTION|AFFILIATION|CLASSIFICATION|RULING|DEDUCTIBILITY|FOUNDATION| ACTIVITY|ORGANIZATION|STATUS|TAX_PERIOD|ASSET_CD|INCOME_CD|FILING_REQ_CD|PF_FILING_REQ_CD|ACCT_PD|ASSET_AMT|INCOME_AMT|REVENUE_AMT|NTEE_CD|           SORT_NAME|
+---------+--------------------+-------------------+--------------------+--------------+-----+----------+-----+----------+-----------+--------------+------+-------------+----------+---------+------------+------+----------+--------+---------+-------------+----------------+-------+---------+----------+-----------+-

In [5]:
# look for a better way
# Optional: balance your write to reduce small files
#df = df.repartition("STATE")

df = df.repartition(8)  # Optional, for performance/file-size control

# Append to your existing Iceberg table
df.writeTo("my_catalog.nonprofit_data_explorer.bmo_table").append()


                                                                                

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .getOrCreate()

print("Spark session started successfully!")
spark


In [8]:
df.writeTo("my_catalog.nonprofit.bmo") \
  .partitionedBy("STATE") \
  .using("iceberg") \
  .createOrReplace()


25/04/19 14:30:51 WARN HadoopTableOperations: Error reading version hint file s3a://nonprofit-explorer/nonprofit/bmo/metadata/version-hint.text
java.io.FileNotFoundException: No such file or directory: s3a://nonprofit-explorer/nonprofit/bmo/metadata/version-hint.text
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3356)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3185)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:4903)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1200)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1178)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:318)
	at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
	at org.apache.iceberg.BaseTransaction.lambda$commitReplaceTransac

In [2]:
try:
    spark.stop()
    print("stopped")
except:
    print("already stopped")
    pass


already stopped


In [6]:
print(spark.conf.get("spark.sql.catalog.my_catalog.warehouse"))


s3a://nonprofit-explorer/


In [8]:
spark.sql("SELECT * FROM my_catalog.nonprofit_data_explorer.bmo_table").show()


[Stage 8:>                                                          (0 + 1) / 1]

+---------+--------------------+--------------------+--------------------+------------+-----+----------+-----+----------+-----------+--------------+------+-------------+----------+---------+------------+------+----------+--------+---------+-------------+----------------+-------+---------+----------+-----------+-------+--------------------+
|      EIN|                NAME|                 ICO|              STREET|        CITY|STATE|       ZIP|GROUP|SUBSECTION|AFFILIATION|CLASSIFICATION|RULING|DEDUCTIBILITY|FOUNDATION| ACTIVITY|ORGANIZATION|STATUS|TAX_PERIOD|ASSET_CD|INCOME_CD|FILING_REQ_CD|PF_FILING_REQ_CD|ACCT_PD|ASSET_AMT|INCOME_AMT|REVENUE_AMT|NTEE_CD|           SORT_NAME|
+---------+--------------------+--------------------+--------------------+------------+-----+----------+-----+----------+-----------+--------------+------+-------------+----------+---------+------------+------+----------+--------+---------+-------------+----------------+-------+---------+----------+-----------+----

                                                                                