In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, lit
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType
import re
from datetime import datetime
import json

In [41]:
with open("../airflow/config/env.json", "r") as file:
    config = json.load(file)
    mongo_url = config['mongodb']['MONGO_ATLAS_PYTHON_GCP']

# create a local SparkSession
spark = SparkSession.builder \
                .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
                .config("spark.driver.maxResultSize", "1g") \
                .config("spark.network.timeout", "300s") \
                .config("spark.executor.heartbeatInterval", "120s") \
                .config("spark.executor.memory", "4g") \
                .config("spark.driver.memory", "2g") \
                .appName("Normalize data") \
                .getOrCreate()

# define a streaming query
bronze_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
                    .option('spark.mongodb.input.uri', mongo_url) \
                    .option('spark.mongodb.input.database', 'imcp') \
                    .option('spark.mongodb.input.collection', 'bronze_layer') \
                    .load()

print(bronze_df.printSchema())

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- created_time: timestamp (nullable = true)
 |-- howpublished: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- short_caption: string (nullable = true)
 |-- url: string (nullable = true)
 |-- year: string (nullable = true)

None


In [42]:
# bronze_df.write.format("com.mongodb.spark.sql.DefaultSource") \
#             .option('spark.mongodb.output.uri', mongo_url) \
#             .option('spark.mongodb.output.database', 'imcp') \
#             .option('spark.mongodb.output.collection', 'refined') \
#             .mode('append') \
#             .save()

In [43]:
temp_lwc = bronze_df.withColumn('caption', lower(col('caption')))
temp_lwc = temp_lwc.withColumn('short_caption', lower(col('short_caption')))
temp_lwc.take(5)

[Row(_id=Row(oid='66df138714a70899133d85b3'), caption='a kitchen with wooden cabinets on the walls, a stove, multiple drawers, a refrigerator, a counter with fruits, and a well-organized layout for cooking and storage needs.', created_time=datetime.datetime(2024, 9, 10, 5, 25, 56, 734000), howpublished='https://huggingface.co/datasets/laion/220k-GPT4Vision-captions-from-LIVIS', publisher='HuggingFace', short_caption='well-organized kitchen with wooden cabinets, a stove, multiple drawers, a refrigerator, counter space with fruits, and a clutter-free layout for efficient cooking and storage needs.', url='http://images.cocodataset.org/val2017/000000037777.jpg', year='2023'),
 Row(_id=Row(oid='66df138714a70899133d85b4'), caption='a street scene with construction scaffolding, three individuals, a shopping cart filled with personal belongings, street signs, and a sidewalk. the construction scaffolding is blue and has text about the construction company and contact details. one individual is 

In [44]:
def remove_punc(text):
    return re.sub(r'[^a-zA-Z\s]', '', text)

remove_punc_udf = F.udf(remove_punc, StringType())

temp_rmp = temp_lwc.withColumn('caption', remove_punc_udf(col('caption')))
temp_rmp = temp_rmp.withColumn('short_caption', remove_punc_udf(col('short_caption')))
temp_rmp.take(5)

[Row(_id=Row(oid='66df138714a70899133d85b3'), caption='a kitchen with wooden cabinets on the walls a stove multiple drawers a refrigerator a counter with fruits and a wellorganized layout for cooking and storage needs', created_time=datetime.datetime(2024, 9, 10, 5, 25, 56, 734000), howpublished='https://huggingface.co/datasets/laion/220k-GPT4Vision-captions-from-LIVIS', publisher='HuggingFace', short_caption='wellorganized kitchen with wooden cabinets a stove multiple drawers a refrigerator counter space with fruits and a clutterfree layout for efficient cooking and storage needs', url='http://images.cocodataset.org/val2017/000000037777.jpg', year='2023'),
 Row(_id=Row(oid='66df138714a70899133d85b4'), caption='a street scene with construction scaffolding three individuals a shopping cart filled with personal belongings street signs and a sidewalk the construction scaffolding is blue and has text about the construction company and contact details one individual is walking by another pe

In [45]:
tokenize_udf = F.udf(lambda text: text.split(" "), ArrayType(StringType()))

temp_tok = temp_rmp.withColumn('caption_tokens', tokenize_udf(col('caption')))
temp_tok = temp_tok.withColumn('short_caption_tokens', tokenize_udf(col('short_caption')))
temp_tok.take(5)

[Row(_id=Row(oid='66df138714a70899133d85b3'), caption='a kitchen with wooden cabinets on the walls a stove multiple drawers a refrigerator a counter with fruits and a wellorganized layout for cooking and storage needs', created_time=datetime.datetime(2024, 9, 10, 5, 25, 56, 734000), howpublished='https://huggingface.co/datasets/laion/220k-GPT4Vision-captions-from-LIVIS', publisher='HuggingFace', short_caption='wellorganized kitchen with wooden cabinets a stove multiple drawers a refrigerator counter space with fruits and a clutterfree layout for efficient cooking and storage needs', url='http://images.cocodataset.org/val2017/000000037777.jpg', year='2023', caption_tokens=['a', 'kitchen', 'with', 'wooden', 'cabinets', 'on', 'the', 'walls', 'a', 'stove', 'multiple', 'drawers', 'a', 'refrigerator', 'a', 'counter', 'with', 'fruits', 'and', 'a', 'wellorganized', 'layout', 'for', 'cooking', 'and', 'storage', 'needs'], short_caption_tokens=['wellorganized', 'kitchen', 'with', 'wooden', 'cabin

In [46]:
final_df = temp_tok.withColumn('created_time', lit(datetime.now()))
final_df = final_df.select(['url', 'caption', 'short_caption', 'caption_tokens', 'short_caption_tokens', 'publisher', 'created_time'])
final_df.take(5)

[Row(url='http://images.cocodataset.org/val2017/000000037777.jpg', caption='a kitchen with wooden cabinets on the walls a stove multiple drawers a refrigerator a counter with fruits and a wellorganized layout for cooking and storage needs', short_caption='wellorganized kitchen with wooden cabinets a stove multiple drawers a refrigerator counter space with fruits and a clutterfree layout for efficient cooking and storage needs', caption_tokens=['a', 'kitchen', 'with', 'wooden', 'cabinets', 'on', 'the', 'walls', 'a', 'stove', 'multiple', 'drawers', 'a', 'refrigerator', 'a', 'counter', 'with', 'fruits', 'and', 'a', 'wellorganized', 'layout', 'for', 'cooking', 'and', 'storage', 'needs'], short_caption_tokens=['wellorganized', 'kitchen', 'with', 'wooden', 'cabinets', 'a', 'stove', 'multiple', 'drawers', 'a', 'refrigerator', 'counter', 'space', 'with', 'fruits', 'and', 'a', 'clutterfree', 'layout', 'for', 'efficient', 'cooking', 'and', 'storage', 'needs'], publisher='HuggingFace', created_ti

In [47]:
# refined_data = temp.toDF().randomSplit([1/1000]*1000)
# batch = refined_data[0]

In [None]:
final_df.write.format("com.mongodb.spark.sql.DefaultSource") \
                .option('spark.mongodb.output.uri', mongo_url) \
                .option('spark.mongodb.output.database', 'imcp') \
                .option('spark.mongodb.output.collection', 'refined') \
                .mode('append') \
                .save()

In [39]:
spark.stop()

In [52]:
final_df.count()

217868