In [None]:
import time

# Import necessary packages for spark, and processing the data with spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, regexp_replace, lower, explode, trim, split, lit
from pyspark.sql.types import StructType, StructField, StringType

# In some cases we need to differentiate pyspark functions from built in functions

import pyspark.sql.functions as py_func

In [None]:
# Create a spark session

spark = SparkSession.builder\
    .master("spark://192.168.2.97:7077") \
    .appName("Question-2-Final")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
    .config("spark.shuffle.service.enabled", True)\
    .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
    .config("spark.executor.memory", "2048m")\
    .config("spark.cores.max", 8)\
    .getOrCreate()

In [None]:
start_time = time.time()

In [None]:
# Function for parsing the body of the json

def parse_json_body(file):
    
    # Read the json file
    df = spark.read.json(file)

    # Lowercase the file
    df_lowercase_body = df.select(lower("body").alias("lowercase_body"))

    # String of characters to delete in the body
    charachters_to_delete = "[\.,\[\]\(\):_\-!?\'\+=;/&{}@$#*\"\\\\%><|~¨´¤]"

    # Remove characters from body
    df_clean = df_lowercase_body.withColumn("lowercase_body", regexp_replace(df_lowercase_body.lowercase_body, charachters_to_delete, ""))

    # Split body into words and make a row for each word
    df_words = df_clean.select(explode(split("lowercase_body", "\s+")).alias("word"))

    return df_words

In [None]:
# The function that reterives the data from the HDFS and preprocess it

def get_word_count_and_df_all_years(years):
    # Create dictionary, schema, and a dataframe with that schema
    word_count = {}
    schema = StructType([StructField("word", StringType(), True),])
    df_all_years = spark.createDataFrame([], schema)

    # Loop over the years
    for year in years:
        df_current_year = spark.createDataFrame([], schema)

        # Append all the months to the dataframe
        for month in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]:
            df_current_month = parse_json_body("hdfs://192.168.2.97:50000/user/ubuntu/RC_20" + year + "-" + month + ".json")
            df_current_year = df_current_year.union(df_current_month)

        # Count and sort the words, save the result in word_count
        word_count_current_year = df_current_year.groupBy("word").count().orderBy("count", ascending=False)
        word_count.update({"20" + year: word_count_current_year})

        # Add current year to the dataframe
        df_all_years = df_all_years.union(df_current_year)
    return word_count, df_all_years

In [None]:
# The years we are going to use
#years = ["06", "07", "08", "09", "10"]
years = ["07"] # For time test

# Get the word_count dictionary and the datframe
word_count, _ = get_word_count_and_df_all_years(years)

In [None]:
# The words we are going to analyze
search_words = ["football", "code", "bush", "blog", "vote", "obama", "market", "rights", "science", "information", "woman", "lol", "iphone", "bitcoin", "instagram", "facebook", "spotify", "sweden", "wikileaks", "torrent", "man", "twitter"]

# Create a DataFrame with all search words and set the count to zero
all_words_df = spark.createDataFrame([(word, 0) for word in search_words], ["word", "count"])
for year in years:
    # Filter the DataFrame to keep only the rows that contain the search words
    filtered_df = word_count.get("20"+year).filter(py_func.col("word").isin(search_words))

    # Compute the sum of the counts for each word
    word_count_df = filtered_df.groupBy("word").agg(py_func.sum("count").alias("sum 20"+year))

    # Sum the column count
    total_count = word_count_df.select(py_func.sum("sum 20"+year)).collect()[0][0]
    
    # Divide the sum of the counts by the total number of words
    word_count_df = word_count_df.withColumn("% 20"+year, py_func.round(py_func.col("sum 20"+year)/(total_count/100),1))

    # Left outer join the two DataFrames and replace null counts with zeros
    all_words_df = all_words_df.join(word_count_df, ["word"], "left_outer").na.fill({"sum 20"+year: 0}).drop("count")

# Show the result
all_words_df.orderBy("word").show(len(search_words))


In [None]:
end_time = time.time()
print(f"Elapsed time: {end_time - start_time} seconds")

In [None]:
spark.stop()