In [None]:
import os
os.getcwd()
os.environ["SPARK_HOME"] = "/home/ubuntu/spark/" #/spark-3.3.2-bin-hadoop3.2.1"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["HADOOP_CONF_DIR"] = "/usr/local/hadoop/etc/hadoop/"

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from delta import *
import pandas as pd
import time

In [None]:
builder = SparkSession.builder.appName("Sentiment140").master('yarn') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Disable logging
import logging

logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.OFF)
logger.LogManager.getLogger("akka").setLevel(logger.Level.OFF)

spark.conf.set("spark.driver.log.level", "OFF")

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import to_timestamp, col

schema = StructType([
    StructField("label", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("query", StringType(), True),
    StructField("username", StringType(), True),
    StructField("text", StringType(), True)
])

df = spark.read.csv("/dis_materials/csv_output1", header=False, schema=schema)

# Transform text variables to proper types
# df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "EEE MMM dd HH:mm:ss z yyyy"))

# Cache the DataFrame
df.cache() # Caching intermediate DataFrames - Optimization1

In [None]:
# Get the number of rows and columns
num_rows = df.count()
num_cols = len(df.columns)
print("There are {:,} rows and {} columns.\n".format(num_rows, num_cols))

# Print First 5 Rows
df.show(5)

# Show descriptive statistics for selected columns
df.select('label', 'timestamp', 'query', 'text').describe().show()

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Tokenize the 'text' column
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(df)

# Calculate the term frequency (TF)
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features")
tf_data = hashing_tf.transform(words_data)

# Calculate the inverse document frequency (IDF)
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(tf_data)
tfidf_data = idf_model.transform(tf_data)

tfidf_data.show(truncate=False)

# Set the number of output partitions
tfidf_data = tfidf_data.coalesce(10) #Adjusting the number of output partitions - optimization 2

In [None]:
# Write the results (including TF-IDF features) to a Delta table
delta_table_path = "hdfs:///dis_materials/group15_delta"
tfidf_data.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(delta_table_path)

# Merge the new data with the existing Delta table to remove duplicates
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.alias("oldData") \
    .merge(tfidf_data.alias("newData"), "oldData.id = newData.id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

In [None]:
# Stop the Spark session
spark.stop()