Dependencies

In [None]:
#Library and objects
#Python 3.11.14

from pyspark.sql import SparkSession #Version 3.4.1
import pyspark.sql.functions as F

from elasticsearch import Elasticsearch #Version 8.9.0
import os

In [None]:
#Spark Session 

spark = SparkSession.builder.appName("movie_indexing_pipeline") \
                    .config("spark.jars", "./spark-jars/elasticsearch-spark-30_2.12-8.9.0.jar") \
                    .config("spark.driver.memory", "3g") \
                    .config("spark.driver.cores", "2") \
                    .config("spark.executor.instances", "1") \
                    .config("spark.executor.memory", "6g") \
                    .config("spark.executor.cores", "4") \
                    .getOrCreate()


In [None]:
#Elasticsearch connection

es = Elasticsearch("http://127.0.0.1:9200")

try:
    es.info()
    print("Sucessfully connection!")
except Exception as error:
    print(f"Connection failed: {error}.")

Creating Elasticsearch Index

In [None]:
index_name = "movies"

In [None]:
body = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "sinopse": {"type": "text"},
            "release_date": {
                                "type": "date",
                                "format": "yyyy-MM-dd"
                            }
        }
    }
}

In [None]:
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=body)
    print(f"Index '{index_name}' - Create successfully!")
else: 
    print(f"Index '{index_name}' - Already exists.")

Auxiliar Functions

In [None]:
def normalize_title_column(spark_dataframe, title_column):
    return spark_dataframe.withColumn(title_column, F.initcap(F.trim(title_column)))

In [None]:
def transform_month_in_full_to_numeric(spark_dataframe, month_column):
    return spark_dataframe.withColumn(month_column, F.when(F.col(month_column)=="JANUARY", "01")
                                                     .when(F.col(month_column)=="FEBRUARY", "02")
                                                     .when(F.col(month_column)=="MARCH", "03")
                                                     .when(F.col(month_column)=="APRIL", "04")
                                                     .when(F.col(month_column)=="MAY", "05")
                                                     .when(F.col(month_column)=="JUNE", "06")
                                                     .when(F.col(month_column)=="JULY", "07")
                                                     .when(F.col(month_column)=="AUGUST", "08")
                                                     .when(F.col(month_column)=="SEPTEMBER", "09")
                                                     .when(F.col(month_column)=="OCTOBER", "10")
                                                     .when(F.col(month_column)=="NOVEMBER", "11")
                                                     .otherwise("12")
                                     )

In [None]:
def create_full_release_date_column(spark_dataframe, numeric_day_column, numeric_month_column, numeric_year_column):
    return spark_dataframe.withColumn("release_date", F.concat_ws("-", numeric_year_column, numeric_month_column, numeric_day_column)) \
                        .withColumn("release_date", F.date_format(F.col("release_date"), "yyyy-MM-dd"))

In [None]:
def select_to_index_columns(spark_dataframe, title_column, sinopse_column, full_release_date_formated_column): 
    return spark_dataframe.select(F.col(title_column), F.col(sinopse_column), F.col(full_release_date_formated_column))

ETL - Extraction

In [None]:
main_data_dir = "./data"
csv_file_full_path = os.path.join(main_data_dir, "CSV-MOVIES.csv")
parquet_file_full_path = os.path.join(main_data_dir, "PARQUET-MOVIES")

In [None]:
csv_file_df = spark.read.csv(csv_file_full_path, header=True)

In [None]:
parquet_file_df = spark.read.parquet(parquet_file_full_path)

ETL - Transform

In [None]:
# CSV file ransforming

csv_file_df = normalize_title_column(csv_file_df, "title")
csv_file_df = transform_month_in_full_to_numeric(csv_file_df, "release_month")
csv_file_df = create_full_release_date_column(csv_file_df, "release_day", "release_month", "release_year")
csv_file_df = select_to_index_columns(csv_file_df, "title", "sinopse", "release_date")

In [None]:
# PARQUET file transforming

parquet_file_df = normalize_title_column(parquet_file_df, "title")
parquet_file_df = transform_month_in_full_to_numeric(parquet_file_df, "release_month")
parquet_file_df = create_full_release_date_column(parquet_file_df, "release_day", "release_month", "release_year")
parquet_file_df = select_to_index_columns(parquet_file_df, "title", "sinopse", "release_date")

In [None]:
final_df = parquet_file_df.union(csv_file_df)

ETL - Load

In [None]:
final_df.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "movies") \
    .option("es.nodes", "127.0.0.1") \
    .option("es.port", "9200") \
    .mode("append") \
    .save()