In [0]:
from delta.tables import DeltaTable

# File location and type
file_location = "dbfs:/mnt/olist-silver/order_reviews_fortrans"
file_type = "Delta"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

source_table = 'default.reviews_to_translate_copy'

# Your destination table (output table) catalog.schema.destination_table
destination_table = 'default.reviews_to_translate_copy_result'

spark.sql(f"drop table if exists {source_table}")
spark.sql(f"drop table if exists {destination_table}")

if not spark.catalog.tableExists(source_table):
  df.write.format("delta").mode("overwrite").saveAsTable(source_table)

In [0]:
if not spark.catalog.tableExists(destination_table):
    source_df = spark.read.table(source_table)
    source_df.write.format("delta").mode("append").saveAsTable(destination_table)
    spark.sql(f"alter table {destination_table} add column status_code int")
    spark.sql(f"alter table {destination_table} add column processed boolean")
    spark.sql(f"UPDATE {destination_table} SET processed = 0")
    spark.sql(f"alter table {destination_table} add column processed_at timestamp")
    spark.sql(
        f"alter table {destination_table} add column review_comment_title_translated string")
    spark.sql(
        f"alter table {destination_table} add column review_comment_message_tr_translated string")
    spark.sql(f"alter table {destination_table} add column error string")
    destination_delta_table = DeltaTable.forName(spark, destination_table)
else:
    destination_delta_table = DeltaTable.forName(spark, destination_table)

In [0]:
import requests, uuid, json
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, struct, lit
from datetime import datetime, timezone
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType

In [0]:
#Check if the given string is null, empty, or effectively null.
def is_null_or_empty(input_str: str):
    return not input_str or not input_str.strip() or not input_str.lower() != "null"

#Searches for the first occurrence of a specified element within a list and returns its index.If the element is not found, returns -1
def find_index_in_list(input_list, command_type):
    if command_type in input_list:
        return input_list.index(command_type)
    else:
        return -1

#The function attempts to translate the 'review_comment_title' and 'review_comment_message' fields from the input dictionary,handling API requests and responses using the Azure Translator API.

def process_data(row):

    # Add your key and endpoint
    key = "f3960753d1b948ddb7c2599a912c9a85"
    endpoint = "https://api.cognitive.microsofttranslator.com"
    location = "southeastasia"
    path = '/translate'
    constructed_url = endpoint + path

    params = {
        'api-version': '3.0',
        'from': 'pt',
        'to': ['en']
    }

    headers = {
        'Ocp-Apim-Subscription-Key': key,
        # location required if you're using a multi-service or regional (not global) resource.
        'Ocp-Apim-Subscription-Region': location,
        'Content-type': 'application/json',
        'X-ClientTraceId': str(uuid.uuid4())
    }

    tracker = []
    body = []
    if not is_null_or_empty(row['review_comment_title']):
        tracker.append('review_comment_title')
        body.append({
            'text': row['review_comment_title']
        })
    if not is_null_or_empty(row['review_comment_message']):
        tracker.append('review_comment_message')
        body.append({
            'text': row['review_comment_message']
        })

    try:
        response = requests.post(constructed_url, params=params, headers=headers, json=body)
        response.raise_for_status()
        response_content = json.loads(response.text)
        review_comment_title_translated = None
        review_comment_title_index = find_index_in_list(tracker, 'review_comment_title')
        review_comment_message_tr_translated = None
        review_comment_message_tr_index = find_index_in_list(tracker, 'review_comment_message')
        if review_comment_title_index != -1:
            review_comment_title_translated = response_content[review_comment_title_index]['translations'][0]['text']
        if review_comment_message_tr_index != -1:
            review_comment_message_tr_translated = response_content[review_comment_message_tr_index]['translations'][0]['text']

        return {"status_code": response.status_code if 'response' in locals() else None,
                "processed_at": datetime.now(timezone.utc),
                "review_comment_title_translated": review_comment_title_translated,
                "review_comment_message_tr_translated": review_comment_message_tr_translated, "error": None}
    except Exception as e:
        return {"status_code": response.status_code if 'response' in locals() else None,
                "processed_at": datetime.now(timezone.utc), "review_comment_title_translated": None,
                "review_comment_message_tr_translated": None, "error": str(e)}
    if not is_null_or_empty(row['review_comment_title']):
        tracker.append('review_comment_title')
        body.append({
            'text': row['review_comment_title']
        })
    if not is_null_or_empty(row['review_comment_message']):
        tracker.append('review_comment_message')
        body.append({
            'text': row['review_comment_message']
        })

    try:
        response = requests.post(constructed_url, params=params, headers=headers, json=body)
        response.raise_for_status()
        response_content = json.loads(response.text)
        review_comment_title_translated = None
        review_comment_title_index = find_index_in_list(tracker, 'review_comment_title')
        review_comment_message_tr_translated = None
        review_comment_message_tr_index = find_index_in_list(tracker, 'review_comment_message')
        if review_comment_title_index != -1:
            review_comment_title_translated = response_content[review_comment_title_index]['translations'][0]['text']
        if review_comment_message_tr_index != -1:
            review_comment_message_tr_translated = response_content[review_comment_message_tr_index]['translations'][0][
                'text']

        return {"status_code": response.status_code if 'response' in locals() else None,
                "processed_at": datetime.now(timezone.utc),
                "review_comment_title_translated": review_comment_title_translated,
                "review_comment_message_tr_translated": review_comment_message_tr_translated, "error": None}
    except Exception as e:
        return {"status_code": response.status_code if 'response' in locals() else None,
                "processed_at": datetime.now(timezone.utc), "review_comment_title_translated": None,
                "review_comment_message_tr_translated": None, "error": str(e)}

In [0]:
chunk_size = 5
source_table = 'default.reviews_to_translate_copy'
destination_table = 'default.reviews_to_translate_copy_result'
source_df = spark.read.table(source_table)
row_count = source_df.count()
print("Number of rows in the DataFrame: ", row_count)

destination_df = spark.read.table(destination_table)
destination_delta_table = DeltaTable.forName(spark, destination_table)

call_process_data_udf = udf(process_data, StructType([
    StructField("status_code", IntegerType(), True),
    StructField("processed_at", TimestampType(), True),
    StructField("review_comment_title_translated", StringType(), True),
    StructField("review_comment_message_tr_translated", StringType(), True),
    StructField("error", StringType(), True),
]))

Number of rows in the DataFrame:  1000


In [0]:
is_processed_available = "processed" in destination_df.columns
if is_processed_available:
    destination_df = destination_df.filter((f.col("processed").isNull()) | (f.col("processed") == False))

df_count = destination_df.count()

num_chunks = (df_count + chunk_size - 1) // chunk_size
counter = 0

for i in range(num_chunks):
    offset = i * chunk_size

    if is_processed_available:
        chunk_df = spark.sql(
            f"SELECT * FROM {destination_table} where (processed is null or processed = false) order by review_id LIMIT {chunk_size}")
    else:
        chunk_df = spark.sql(
            f"SELECT * FROM {destination_table} order by review_id LIMIT {chunk_size} OFFSET {offset}")

    if not chunk_df.rdd.isEmpty():
        resultant_df = chunk_df.withColumn("Result",
                                           call_process_data_udf(
                                               struct([chunk_df[x] for x in chunk_df.columns])))

        resultant_df = resultant_df.withColumn("status_code", f.col("Result.status_code"))
        resultant_df = resultant_df.withColumn("processed_at", f.col("Result.processed_at"))
        resultant_df = resultant_df.withColumn("review_comment_title_translated",
                                               f.col("Result.review_comment_title_translated"))
        resultant_df = resultant_df.withColumn("review_comment_message_tr_translated",
                                               f.col("Result.review_comment_message_tr_translated"))
        resultant_df = resultant_df.withColumn("error", f.col("Result.error"))

        """
        review_id has taken as a primary key so we perform merge between destination table
        and resultant_df based on this column. 
        """
        destination_delta_table.alias('inputs') \
            .merge(
            resultant_df.alias('updates'),
            f'inputs.review_id = updates.review_id'
        ) \
            .whenMatchedUpdate(set=
        {
            "status_code": f.col("updates.status_code"),
            "processed": lit(True),
            "processed_at": f.col("updates.processed_at"),
            "review_comment_title_translated": f.col("updates.review_comment_title_translated"),
            "review_comment_message_tr_translated": f.col("updates.review_comment_message_tr_translated"),
            "error": f.col("updates.error")
        }
        ).execute()

        if destination_delta_table.toDF().schema != resultant_df.schema:
            destination_delta_table = DeltaTable.forName(spark, destination_table)

        counter = counter + chunk_size
        print(f"processed: {counter}")

processed: 5
processed: 10
processed: 15
processed: 20
processed: 25
processed: 30
processed: 35
processed: 40
processed: 45
processed: 50
processed: 55
processed: 60
processed: 65
processed: 70
processed: 75
processed: 80
processed: 85
processed: 90
processed: 95
processed: 100
processed: 105
processed: 110
processed: 115
processed: 120
processed: 125
processed: 130
processed: 135
processed: 140
processed: 145
processed: 150
processed: 155
processed: 160
processed: 165
processed: 170
processed: 175
processed: 180
processed: 185
processed: 190
processed: 195
processed: 200
processed: 205
processed: 210
processed: 215
processed: 220
processed: 225
processed: 230
processed: 235
processed: 240
processed: 245
processed: 250
processed: 255
processed: 260
processed: 265
processed: 270
processed: 275
processed: 280
processed: 285
processed: 290
processed: 295
processed: 300
processed: 305
processed: 310
processed: 315
processed: 320
processed: 325
processed: 330
processed: 335
processed: 340


In [0]:
destination_df = spark.read.table(destination_table)
destination_df.write.mode("overwrite").format("delta").save("/mnt/olist-gold/reviews_eng")

In [0]:
df = spark.read.format('delta').load("/mnt/olist-gold/reviews_eng")
row_count = df.count()
print("Number of rows in the DataFrame: ", row_count)
display(df)
