In [1]:
# import libraries
import json

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import col, current_timestamp, struct, to_json, lit

In [2]:
def get_avro_schema(spark_df, schema_type: str, name: str, namespace: str):
    """
    Returns the corresponding avro schema for the passed in spark dataframe.
    The type mapping covers most commonly used types, every field is made to be nullable.
    """

    schema_base = {"type": schema_type, "namespace": namespace, "name": name}

    # Keys are Spark Types, Values are Avro Types
    avro_mapping = {
        "StringType": ["string", "null"],
        "LongType": ["long", "null"],
        "IntegerType": ["int", "null"],
        "BooleanType": ["boolean", "null"],
        "FloatType": ["float", "null"],
        "DoubleType": ["double", "null"],
        "TimestampType": ["long", "null"],
        "ArrayType(StringType,true)": [
            {"type": "array", "items": ["string", "null"]},
            "null",
        ],
        "ArrayType(IntegerType,true)": [
            {"type": "array", "items": ["int", "null"]},
            "null",
        ],
    }

    fields = []

    for field in spark_df.schema.fields:
        if str(field.dataType) in avro_mapping:
            fields.append(
                {"name": field.name, "type": avro_mapping[str(field.dataType)]}
            )
        else:
            fields.append({"name": field.name, "type": str(field.dataType)})

    schema_base["fields"] = fields

    return json.dumps(schema_base)
def generate_avro_schema_from_json(json_data):
    def avro_type_mapping(value):
        if value is None:
            return ["string", "null"]
        elif isinstance(value, bool):
            return ["boolean", "null"]
        elif isinstance(value, int):
            return ["int", "null"]
        elif isinstance(value, float):
            return ["double", "null"]
        elif isinstance(value, str):
            return ["string", "null"]
        else:
            return ["string", "null"]

    avro_schema = {
        "type": "record",
        "name": "Default_schema",
        "fields": [
            {"name": key, "type": avro_type_mapping(value)}
            for key, value in json_data.items()
        ],
    }
    return avro_schema

In [3]:
# init session
spark = (
    SparkSession.builder.appName("delivery-data-from-sap-hana-to-kafka")
    .config(
        "spark.jars.packages",
        "org.postgresql:postgresql:42.7.1,"+
        "com.sap.cloud.db.jdbc:ngdbc:2.19.15,"+
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
    )
    .enableHiveSupport()
    .getOrCreate()
)

In [4]:
# show configured parameters
SparkConf().getAll()

[('spark.files',
  'file:///home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.7.1.jar,file:///home/jovyan/.ivy2/jars/com.sap.cloud.db.jdbc_ngdbc-2.19.15.jar,file:///home/jovyan/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.5.0.jar,file:///home/jovyan/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.5.0.jar,file:///home/jovyan/.ivy2/jars/org.checkerframework_checker-qual-3.41.0.jar,file:///home/jovyan/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.5.0.jar,file:///home/jovyan/.ivy2/jars/org.apache.kafka_kafka-clients-3.4.1.jar,file:///home/jovyan/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.0.jar,file:///home/jovyan/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar,file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-client-runtime-3.3.4.jar,file:///home/jovyan/.ivy2/jars/org.lz4_lz4-java-1.8.0.jar,file:///home/jovyan/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.10.3.jar,file:///home/jovyan/.ivy2/jars/org.slf4j_slf4j-api-2.0.7.jar,file:///home/jovya

In [5]:
# set log level
spark.sparkContext.setLogLevel("INFO")

In [6]:
df = (
    spark.read.format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", "jdbc:postgresql://database:5432/database")
    .option("dbtable", "company")
    .option("user", "postgres")
    .option("password", "postgres")
    .load()
)

In [7]:
# df = (
#     spark.read.format("jdbc")
#     .option("driver", "com.sap.db.jdbc.Driver")
#     # .option("url", "jdbc:sap://10.158.2.40:30041/HAQ")
#     .option("url", "jdbc:sap://10.163.9.4:30041/HAQ")
#     .option("dbtable", "SAPHANADB.CRCO")
#     .option("numPartitions", 4)
#     .option("user", "SYNAPSE_READ")
#     .option("password", "Syn@ps322SAP22")
#     .load()
# )

In [8]:
df_processed = (
        df.withColumn("ingestion_time", lit(current_timestamp()))
        .withColumn("source_system", lit("sap"))
        .withColumn("user_name", lit("gersonrs"))
        .withColumn("ingestion_type", lit("spark"))
        .withColumn("base_format", lit("table"))
        .withColumn("rows_written", lit(df.count()))
        .withColumn("schema", lit(df.schema.json()))
    )

In [9]:
df_processed.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- salary: float (nullable = true)
 |-- ingestion_time: timestamp (nullable = false)
 |-- source_system: string (nullable = false)
 |-- user_name: string (nullable = false)
 |-- ingestion_type: string (nullable = false)
 |-- base_format: string (nullable = false)
 |-- rows_written: integer (nullable = false)
 |-- schema: string (nullable = false)



In [10]:
print("total de registros: " + str(df.count()))

total de registros: 7


In [11]:
ds = (
    df_processed.select(to_json(struct("*")).alias("value"))
    .selectExpr("CAST(value AS STRING)")
    .write.format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("topic", "topic2")
    .save()
)

In [12]:
js = json.loads(json_df.collect()[0][0])
print(js)
avro_schema1 = generate_avro_schema_from_json(js)
avro_schema = get_avro_schema(df, "record", "Default_schema", "Default_namespace")
avro_schema1

NameError: name 'json_df' is not defined

In [None]:
df_processed.collect()

In [None]:
ttt = df_processed.select(to_avro(col("id")).alias("key"))
ttt.collect()

In [None]:
df_transformed = df.select(to_json(struct("*")).alias("value")).selectExpr("CAST(value AS STRING)")

In [None]:
df_transformed.show()

In [None]:
avroDf = df_transformed.select(to_avro(df_transformed.value).alias("avro"))

In [None]:
avroDf.collect()

In [None]:
json_df = df.select(to_json(struct(df.columns)).alias("value"))
json_df.show()

In [None]:
df_avro = json_df.select(to_avro(to_json(struct("value"), avro_schema1)).alias("value"))
df_avro.collect()

In [None]:
t = json_df.select(to_json(struct("*")).alias("value")).selectExpr("CAST(value AS STRING)")
t.show()

In [None]:
ds1 = (
    df_avro
    .write.format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("topic", "topic2")
    .save()
)