Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Push Hudi Commit Notification TO HTTP URI with Callback | Passing Custom Headers ? #8834

Closed
soumilshah1995 opened this issue May 28, 2023 · 2 comments
Labels
aws-support feature-enquiry issue contains feature enquiries/requests or great improvement ideas priority:minor everything else; usability gaps; questions; feature reqs

Comments

@soumilshah1995
Copy link

soumilshah1995 commented May 28, 2023

Hello i would want to ask following question about callback

My requirements or API require HTTP basic authentication, which means I must provide a username and password.

image

is there a way we can pass custom values into header when API call is being made through callback ?

i am trying to directly insert hudi items into elastic Search and in order to do so i have to pass auth credentials through header i could not find setting to provide custom headers

Sample Code

try:

    import os
    import sys
    import uuid
    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from datetime import datetime
    import datetime
    from datetime import datetime, date, timedelta
    from faker import Faker
except Exception as e:
    print("Error : ", e)

SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

global faker
faker = Faker()


def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "created_at": datetime.now().isoformat().__str__(),
            "address": faker.address(),

        }
        customers_array.append(customer_data)
    return customers_array


def get_orders_data(customer_ids, order_data_sample_size=3):
    orders_array = []
    for i in range(0, order_data_sample_size):
        try:
            order_id = uuid.uuid4().__str__()
            customer_id = random.choice(customer_ids)
            order_data = {
                "order_id": order_id,
                "name": faker.text(max_nb_chars=20),
                "order_value": random.randint(10, 1000).__str__(),
                "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
                "order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'),
                "customer_id": customer_id,

            }
            orders_array.append(order_data)
        except Exception as e:
            print(e)
    return orders_array


def upsert_hudi_table(
        db_name,
        table_name,
        record_id,
        precomb_key,
        spark_df,
        table_type='COPY_ON_WRITE',
        method='upsert',
):
    path = f"file:///C:/tmp/{db_name}/{table_name}"
    print("path", path, end="\n")

    hudi_options = {

        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.recordkey.field': record_id,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.precombine.field': precomb_key
        ,"hoodie.write.commit.callback.http.url":"http://localhost:9200/hudi_stats/_doc"
        ,"hoodie.write.commit.callback.on":"true"
        ,"hoodie.write.commit.callback.http.timeout.seconds":"180"
    }
    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)


global total_customers, order_data_sample_size

total_customers = 50
order_data_sample_size = 100

customer_data = get_customer_data(total_customers=total_customers)
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data],
                                           schema=list(customer_data[0].keys()))
spark_df_customers.select(['customer_id', 'name', 'state', 'city', 'email', 'created_at']).show(3)

upsert_hudi_table(
    db_name='hudidb',
    table_name='customers',
    record_id='customer_id',
    precomb_key='created_at',
    spark_df=spark_df_customers,
    table_type='COPY_ON_WRITE',
    method='upsert',
)

image

if there is a way i would love to learn how we can pass in custom headers ?

references
https://hudi.apache.org/docs/next/configurations#COMMIT_CALLBACK
image

@xushiyan xushiyan added aws-support priority:minor everything else; usability gaps; questions; feature reqs feature-enquiry issue contains feature enquiries/requests or great improvement ideas labels May 31, 2023
@ad1happy2go
Copy link
Collaborator

@soumilshah1995 Thanks for raising this. Hudi dont have anyway to pass custom header as of moment.

Created JIRA to track this improvement - https://issues.apache.org/jira/browse/HUDI-6441

@soumilshah1995
Copy link
Author

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws-support feature-enquiry issue contains feature enquiries/requests or great improvement ideas priority:minor everything else; usability gaps; questions; feature reqs
Projects
Archived in project
Development

No branches or pull requests

3 participants