In [1]:
import sys
sys.path.append("./work/imcp")

from utils.config import get_settings
from operators.streaming import SparkStreaming
import pyspark.sql.functions as F

# Create Spark Session by user defined class

In [2]:
settings = get_settings()
spark = SparkStreaming.get_instance(app_name="Spark Streaming")

In [20]:
spark

# Check the Spark MongoDB Connector

## Read batching data by Spark DataFrame

In [3]:
df = spark.read.format("mongodb") \
            .option("spark.mongodb.read.connection.uri", settings.MONGODB_ATLAS_URI) \
            .option("spark.mongodb.read.database", "imcp") \
            .option("spark.mongodb.read.collection", "audit") \
            .load()

In [6]:
print(df.printSchema())
print("The number of filterd rows: ", df.count())

root
 |-- _id: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- caption_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- short_caption: string (nullable = true)
 |-- url: string (nullable = true)

None
The number of filterd rows:  10


## Write batching data into MongoDB

In [7]:
processed_df.write.format("mongodb") \
            .option("spark.mongodb.write.connection.uri", settings.MONGODB_ATLAS_URI) \
            .option("spark.mongodb.write.database", "imcp") \
            .option("spark.mongodb.write.collection", "audit") \
            .option("spark.mongodb.write.batch.size", "10000") \
            .mode("append") \
            .save()

## Processing IMCP Dataset

In [20]:
# traffic_df = (spark.read
#                 .option("delimiter", ",")
#                 .option("header", True)
#                 .option("encoding", "UTF-8")
#                 .csv("s3a://lakehouse/imcp/csv/traffic_dataset_01.csv"))

traffic_df = (spark.read
                .parquet("s3a://lakehouse/imcp/parquets/lvis_caption_url.parquet")
                .drop("short_caption")
                .dropDuplicates())
            
traffic_df.printSchema()

root
 |-- url: string (nullable = true)
 |-- caption: string (nullable = true)



In [21]:
print(traffic_df.count())
traffic_df.show(5)

217867
+--------------------+--------------------+
|                 url|             caption|
+--------------------+--------------------+
|http://images.coc...|a city street wit...|
|http://images.coc...|three women weari...|
|http://images.coc...|a young boy stand...|
|http://images.coc...|an indoor setting...|
|http://images.coc...|a horse and rider...|
+--------------------+--------------------+
only showing top 5 rows



In [15]:
traffic_df.take(1)

[Row(url='http://images.cocodataset.org/val2017/000000037777.jpg', caption='a kitchen with wooden cabinets on the walls, a stove, multiple drawers, a refrigerator, a counter with fruits, and a well-organized layout for cooking and storage needs.', short_caption='Well-organized kitchen with wooden cabinets, a stove, multiple drawers, a refrigerator, counter space with fruits, and a clutter-free layout for efficient cooking and storage needs.')]

In [19]:
temp = traffic_df.withColumn("caption", F.regexp_replace(F.col("caption"), "[^a-zA-Z0-9\\s]", ""))
temp= temp.withColumn("caption_tokens", F.split(F.col("caption"), " "))
temp.take(1)

[Row(url='http://images.cocodataset.org/val2017/000000400573.jpg', caption='a person outdoors in a park or a similar open space with trees in the background wearing a jacket and a buttonup shirt underneath holding a doughnut and enjoying the moment The person is wearing glasses and appears to be in a casual mood The doughnut is likely sweet due to its glaze and there are no other people or animals visible close to the person but distant figures can be seen in the background', short_caption='A person enjoys a doughnut in a park, wearing a button-up and glasses, casually alone, with distant figures visible in the background.', caption_tokens=['a', 'person', 'outdoors', 'in', 'a', 'park', 'or', 'a', 'similar', 'open', 'space', 'with', 'trees', 'in', 'the', 'background', 'wearing', 'a', 'jacket', 'and', 'a', 'buttonup', 'shirt', 'underneath', 'holding', 'a', 'doughnut', 'and', 'enjoying', 'the', 'moment', 'The', 'person', 'is', 'wearing', 'glasses', 'and', 'appears', 'to', 'be', 'in', 'a',

In [21]:
spark.stop()