Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Multi-cluster writes to Delta Lake Storage in S3 #1498

Closed
soumilshah1995 opened this issue Nov 25, 2022 · 12 comments
Closed

[BUG] Multi-cluster writes to Delta Lake Storage in S3 #1498

soumilshah1995 opened this issue Nov 25, 2022 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@soumilshah1995
Copy link

soumilshah1995 commented Nov 25, 2022

Trying out

https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/
https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#

DynamoDB tables

image

jar files

image

jar file path provided

image

When i run my code i dont see any messages in the dynamodb table
here is code

try:
    import os
    import sys

    import pyspark
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, asc, desc
    from pyspark.sql.functions import *
    from delta.tables import *
    from delta.tables import DeltaTable

    print("All modules are loaded .....")

except Exception as e:
    print("Some modules are missing {} ".format(e))


class DeltaLakeHelper(object):
    """
    Delta Lakes Python helper class that aids in basic operations such as inserting, updating, deleting, merging, removing older files and versions, and generating Athena manifest files.
    """

    def __init__(self, delta_lake_path: str):

        self.spark = self.__create_spark_session()
        self.delta_lake_path = delta_lake_path
        self.delta_df = None

    def generate_manifest_files(self):
        """

        Generates Manifest file for Athena

        :return: Bool
        """
        self.delta_df.generate("symlink_format_manifest")
        return True

    def __generate_delta_df(self):
        try:
            if self.delta_df is None:
                self.delta_df = DeltaTable.forPath(self.spark, self.delta_lake_path)
        except Exception as e:
            pass

    def compact_table(self, num_of_files=10):
        """

        Converts smaller parquert files into larger Files

        :param num_of_files: Int
        :return: Bool

        """
        df_read = self.spark.read.format("delta") \
            .load(self.delta_lake_path) \
            .repartition(num_of_files) \
            .write.option("dataChange", "false") \
            .format("delta") \
            .mode("overwrite") \
            .save(self.delta_lake_path)
        return True

    def delete_older_files_versions(self):
        """

        Deletes Older Version and calls vacuum(0)

        :return: Bool
        """
        self.__generate_delta_df()
        self.delta_df.vacuum(0)
        return True

    def insert_overwrite_records_delta_lake(self, spark_df, max_record_per_file='10000'):
        """
        Inserts into Delta Lake

        :param spark_df: Pyspark Dataframe
        :param max_record_per_file: str ie max_record_per_file= "10000"
        :return:Bool
        """
        spark_df.write.format("delta") \
            .mode("overwrite") \
            .option("maxRecordsPerFile", max_record_per_file) \
            .save(self.delta_lake_path)

        return True

    def append_records_delta_lake(self, spark_df, max_record_per_file="10000"):
        """

        Append data into Delta lakes

        :param spark_df: Pyspark Dataframe
        :param max_record_per_file: str ie max_record_per_file= "10000"
        :return: Bool
        """
        spark_df.write.format("delta") \
            .mode('append') \
            .option("maxRecordsPerFile", max_record_per_file) \
            .save(self.delta_lake_path)
        return True

    def update_records_delta_lake(self, condition="", value_to_set={}):
        """

        Set the value on delta lake

        :param condition : Str Example:  condition="emp_id = '3'"
        :param value_to_set: Dict IE  value_to_set={"employee_name": "'THIS WAS UPDATE ON DELTA LAKE'"}
        :return: Bool
        """
        self.__generate_delta_df()
        self.delta_df.update(condition, value_to_set)
        return True

    def upsert_records_delta_lake(self, old_data_key, new_data_key, new_spark_df):
        """

        Find one and update into delta lake
        If records is found it will update if not it will insert into delta lakes
        See Examples on How to use this

        :param old_data_key: Key is nothing but Column on which you want to merge or upsert data into delta lake
        :param new_data_key: Key is nothing but Column on which you want to merge or upsert data into delta lake
        :param new_spark_df: Spark DataFrame
        :return: Bool
        """
        self.__generate_delta_df()
        dfUpdates = new_spark_df

        self.delta_df.alias('oldData') \
            .merge(dfUpdates.alias('newData'), f'oldData.{old_data_key} = newData.{new_data_key}') \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

        return True

    def delete_records_delta_lake(self, condition=""):
        """

        Set the value on delta lake

        :param condition:Str IE condition="emp_id = '4'"
        :return:Bool
        """
        self.__generate_delta_df()
        self.delta_df.delete(condition)
        return True

    def read_delta_lake(self):
        """

        Reads from Delta lakes

        :return: Spark DF
        """
        df_read = self.spark.read.format("delta").load(self.delta_lake_path)
        return df_read

    def __create_spark_session(self):
        self.spark = SparkSession \
            .builder \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore") \
            .getOrCreate()
        return self.spark


def main():

    try:
        from awsglue.job import Job
        from awsglue.utils import getResolvedOptions
        from awsglue.dynamicframe import DynamicFrame
        from awsglue.context import GlueContext
    except Exception as e:pass

    helper = DeltaLakeHelper(delta_lake_path="s3a://glue-learn-begineers/deltalake/delta_table")

    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    spark = helper.spark
    sc = spark.sparkContext
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)

    # ====================================================
    """Create Spark Data Frame """
    # ====================================================
    data = impleDataUpd = [
        (1, "this is insert 1 ", "Sales", "RJ", 81000, 30, 23000, 827307999),
        (2, "this is insert 2", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
        (3, "this is insert 3", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
        (4, "this is insert 3", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    df_write = spark.createDataFrame(data=data, schema=columns)
    helper.insert_overwrite_records_delta_lake(spark_df=df_write)

    # ====================================================
    """Appending  """
    # ====================================================
    data = impleDataUpd = [
        (5, "this is append", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    df_append = spark.createDataFrame(data=data, schema=columns)
    helper.append_records_delta_lake(spark_df=df_append)


    # ====================================================
    """READ FROM DELTA LAKE  """
    # ====================================================
    df_read = helper.read_delta_lake()
    print("READ", df_read.show())

    # ====================================================
    """UPDATE DELTA LAKE"""
    # ====================================================
    helper.update_records_delta_lake(condition="emp_id = '3'",
                                     value_to_set={"employee_name": "'THIS WAS UPDATE ON DELTA LAKE'"})

    # ====================================================
    """ DELETE DELTA LAKE"""
    # ====================================================
    helper.delete_records_delta_lake(condition="emp_id = '4'")

    # ====================================================
    """ FIND ONE AND UPDATE OR UPSERT DELTA LAKE """
    # ====================================================
    new_data = [
        (2, "this is update on delta lake ", "Sales", "RJ", 81000, 30, 23000, 827307999),
        (11, "This should be append ", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
    ]

    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
    usr_up_df = spark.createDataFrame(data=new_data, schema=columns)

    helper.upsert_records_delta_lake(old_data_key='emp_id',
                                     new_data_key='emp_id',
                                     new_spark_df=usr_up_df)

    # ====================================================
    """ Compaction DELTA Prune Older Version and Create larger Files """
    # ====================================================
    helper.compact_table(num_of_files=2)
    helper.delete_older_files_versions()

    # ====================================================
    """ Create Manifest File for Athena """
    # ====================================================
    helper.generate_manifest_files()

    job.commit()

main()

Please advice

@soumilshah1995
Copy link
Author

Hie is there any updates ??

@tdas
Copy link
Contributor

tdas commented Nov 28, 2022

Can you get the log4j logs of your application by configuring log4j.properties? Here is some links I found that may help it setting it up

With the INFO level logs, we will be able to see what io.delta.storage.S3DynamoDBLogStore class is doing. Is it being correctly loaded or not.

@soumilshah1995
Copy link
Author

i can try to go to cloud watch and get the logs if thats what you want as mentioned jobs succeeded and does nto throws error

@soumilshah1995
Copy link
Author

soumilshah1995 commented Nov 29, 2022

@tdas

2022-11-26T06:13:22.523-05:00

Copy
+------+-----------------+-----------+-----+------+---+-----+----------+
|emp_id|    employee_name| department|state|salary|age|bonus|        ts|
+------+-----------------+-----------+-----+------+---+-----+----------+
|     4| this is insert 3|Engineering|   RJ| 79000| 53|15000|1627694678|
|     2| this is insert 2|Engineering|   RJ| 79000| 53|15000|1627694678|
|     3| this is insert 3|Engineering|   RJ| 79000| 53|15000|1627694678|
|     5|   this is append|Engineering|   RJ| 79000| 53|15000|1627694678|
|     1|this is insert 1 |      Sales|   RJ| 81000| 30|23000| 827307999|
+------+-----------------+-----------+-----+------+---+-----+----------+

READ None

More LOGS

2022-11-26 11:12:45,696 main WARN JNDI lookup class is not available because this JRE does not support JNDI. JNDI string lookups will not be available, continuing configuration. java.lang.ClassNotFoundException: org.apache.logging.log4j.core.lookup.JndiLookup
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:173)
	at org.apache.logging.log4j.util.LoaderUtil.newInstanceOf(LoaderUtil.java:211)
	at org.apache.logging.log4j.util.LoaderUtil.newCheckedInstanceOf(LoaderUtil.java:232)
	at org.apache.logging.log4j.core.util.Loader.newCheckedInstanceOf(Loader.java:301)
	at org.apache.logging.log4j.core.lookup.Interpolator.<init>(Interpolator.java:95)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.<init>(AbstractConfiguration.java:114)
	at org.apache.logging.log4j.core.config.DefaultConfiguration.<init>(DefaultConfiguration.java:55)
	at org.apache.logging.log4j.core.layout.PatternLayout$Builder.build(PatternLayout.java:430)
	at org.apache.logging.log4j.core.layout.PatternLayout.createDefaultLayout(PatternLayout.java:324)
	at org.apache.logging.log4j.core.appender.ConsoleAppender$Builder.<init>(ConsoleAppender.java:121)
	at org.apache.logging.log4j.core.appender.ConsoleAppender.newBuilder(ConsoleAppender.java:111)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.createBuilder(PluginBuilder.java:158)
	at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:119)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:813)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:753)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:745)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:389)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:169)
	at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:181)
	at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:446)
	at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:520)
	at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:536)
	at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:214)
	at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:146)
	at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:41)
	at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194)
	at org.apache.logging.log4j.LogManager.getLogger(LogManager.java:597)
	at org.apache.spark.metrics.sink.MetricsConfigUtils.<clinit>(MetricsConfigUtils.java:12)
	at org.apache.spark.metrics.sink.MetricsProxyInfo.fromConfig(MetricsProxyInfo.java:17)
	at com.amazonaws.services.glue.cloudwatch.CloudWatchLogsAppenderCommon.<init>(CloudWatchLogsAppenderCommon.java:62)
	at com.amazonaws.services.glue.cloudwatch.CloudWatchLogsAppenderCommon$CloudWatchLogsAppenderCommonBuilder.build(CloudWatchLogsAppenderCommon.java:79)
	at com.amazonaws.services.glue.cloudwatch.CloudWatchAppender.activateOptions(CloudWatchAppender.java:73)
	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
	at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
	at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
	at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
	at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
	at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
	at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
	at org.slf4j.LoggerFactory.bind(LoggerFactory.java:150)
	at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:124)
	at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
	at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
	at org.apache.spark.network.util.JavaUtils.<clinit>(JavaUtils.java:41)
	at org.apache.spark.internal.config.ConfigHelpers$.byteFromString(ConfigBuilder.scala:67)
	at org.apache.spark.internal.config.ConfigBuilder.$anonfun$bytesConf$1(ConfigBuilder.scala:259)
	at org.apache.spark.internal.config.ConfigBuilder.$anonfun$bytesConf$1$adapted(ConfigBuilder.scala:259)
	at org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$transform$1(ConfigBuilder.scala:101)
	at org.apache.spark.internal.config.TypedConfigBuilder.createWithDefault(ConfigBuilder.scala:144)
	at org.apache.spark.internal.config.package$.<init>(package.scala:345)
	at org.apache.spark.internal.config.package$.<clinit>(package.scala)
	at org.apache.spark.SparkConf$.<init>(SparkConf.scala:654)
	at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
	at org.apache.spark.SparkConf.set(SparkConf.scala:94)
	at org.apache.spark.SparkConf.$anonfun$loadFromSystemProperties$3(SparkConf.scala:76)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:788)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:787)
	at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
	at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
	at org.apache.spark.SparkConf.<init>(SparkConf.scala:59)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.getSparkConf(ProcessLauncher.scala:41)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.getSparkConf$(ProcessLauncher.scala:40)
	at com.amazonaws.services.glue.ProcessLauncher$$anon$1.getSparkConf(ProcessLauncher.scala:78)
	at com.amazonaws.services.glue.ProcessLauncher.<init>(ProcessLauncher.scala:84)
	at com.amazonaws.services.glue.ProcessLauncher.<init>(ProcessLauncher.scala:78)
	at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:29)
	at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)


@soumilshah1995
Copy link
Author

Any Updates ??

@scottsand-db
Copy link
Collaborator

  • did you create the DDB table yourself? Or was it created by delta automatically?
  • as TD mentioned above, getting the INFO and DEBUG logs would be super useful to check which logstore is being used at runtime

It seems you're having some issues getting the logs, right? Give the error message above. Would this link help? https://stackoverflow.com/questions/70383503/after-log4j-changes-hive-e-returns-additional-warning-which-has-impact-on-the-s

@tdas
Copy link
Contributor

tdas commented Dec 1, 2022

Hey @soumilshah1995 are you sure this is the only error. This is just a WARNING, not an ERROR that log4j library is printing. We want to see INFO level logs that has "DynamoDBLogStore" in it. Are you getting any log with "INFO" in them? If not, you can set the log level in your code as sparkContext.setLogLevel("INFO") before any delta operation. Then you should see info-level logs and in that, grep for "LogStore". Then you can share that filtered log with us for debugging.

@soumilshah1995
Copy link
Author

This is all i have man. please let me know if you want me to include anything specific in code above

@soumilshah1995
Copy link
Author

Any updates ?

@scottsand-db scottsand-db self-assigned this Dec 15, 2022
@scottsand-db
Copy link
Collaborator

Ho @soumilshah1995 - did you follow TD's suggestion to include sparkContext.setLogLevel("INFO") in your spark setup?

Also, if you are having specific log4j issues, I would suggest you post in a Spark thread as Spark has a larger community and there will likely be someone there that has already come across your issue.

@soumilshah1995
Copy link
Author

soumilshah1995 commented Jan 3, 2023

Thanks a scott we switched to Apache Hudi instead of Delta lakes due to limitations i appreciate the help

@scottsand-db
Copy link
Collaborator

Hi @soumilshah1995 I just realized that you are using Delta Lake 1.1.0.

In Delta lake 1.2, we switched to using the delta-storage module ... to allow custom log stores ... like the DynamoDB log store ...

https://docs.delta.io/latest/porting.html#delta-lake-1-1-or-below-to-delta-lake-1-2-or-above

So, multi-cluster writes aren't working because you aren't using a new enough version of Delta Lake! Please use version 1.2 or above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants