Process csv and create Deltatable

In [1]:
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from delta import *

#Create a session suitable for a delta table
builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

#3 nodes with each 4 instances so 12 executors in total, one of which is the driver. 

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Set schema and load dataframe
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Introduce schema
schema = StructType([
    StructField("ID", IntegerType(), False),
    StructField("title", StringType(), False),
    StructField("date", StringType(), False),
    StructField("categories", StringType(), True),
    StructField("subtitles", StringType(), True),
    StructField("related", StringType(), True),
    StructField("text", StringType(), False)
])

#read in our csv file
df = spark.read.csv("/simplewiki/output_full_2g.csv", sep = ',', header = True, schema = schema)

#fix date column: remove []
df = df.withColumn('title', regexp_replace('title', "[\[\]']", ''))

#fix date column: remove [] and set to datetype
df = df.withColumn('date', regexp_replace('date', "[\[\]']", ''))
df = df.withColumn('date', to_date('date', 'yyyy-MM-dd\'T\'HH:mm:ss\'Z\''))

df = df.filter(col("text").isNotNull())\
        .filter(col("ID").isNotNull())\
        .filter(col("title").isNotNull())

df = df.withColumn("text", lower(regexp_replace(df.text, '[^\w\s]|\t|\d+', ' ')))

#write the df to a unmanaged delta table. 
df.write.format("delta").mode("overwrite").save("hdfs:///table_2g")

:: loading settings :: url = jar:file:/home/ubuntu/spark-3.3.2-bin-without-hadoop/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f451d78f-c8c4-4ebc-b747-e01f1ec32b1b;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 535ms :: artifacts dl 22ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   

Keyword extraction

In [1]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in data
dftest = spark.read.format("delta").load("hdfs:///table_2g").limit(2)  # query table by path

# convert text and title to an array
Ctext1 = dftest.withColumn("filt1", split("text", " "))\
               .withColumn("Ctit", split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " "))

#filter words in title from the array in filt
Ctext2 = Ctext1.withColumn("filt2", expr("array_except(filt1, Ctit)"))

#Remove empty strings
Ctext3 = Ctext2.withColumn("filt3", array_remove("filt2", ""))

#remove stopwords
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filtered_map")
Ctext4= stopwords_remover.transform(Ctext3)

#TF-IDF = TF* IDF
# TF = total number occurences in article / total number of words in article
# IDF = log(number of articles / number of articles that contain the word)

#Count number of documents to calculate the IDF later
num_doc = Ctext4.count()

#Count number of words in text
Ctext5 = Ctext4.withColumn("text_len", size(Ctext4.filtered_map))

#create row for each word in text
df_ex= Ctext5.select("ID", "text_len" , explode(Ctext5.filtered_map).alias("word_split"))

#group the df by the word_split column and count the distinct number of IDs for each word
df_count = df_ex.groupBy("word_split").agg(countDistinct("ID").alias("num_articles"))

from pyspark.sql.window import Window
#group the rows for the same word and article (ID) and calculate TF
df_ex = df_ex.groupBy("ID", "word_split", "text_len") \
        .agg(count("word_split").alias("count"), \
             expr("count(word_split) / text_len").alias("TF"))

# calculate the idf for every word 
df_join = df_ex.join(df_count, on="word_split", how="left") #join1

df_join = df_join.withColumn("IDF", log(num_doc / df_join.num_articles))

#calculate the tf*IDF for every word
df_join = df_join.withColumn("TF_IDF", expr("TF * IDF"))

#select the 10 keywords
from pyspark.sql.window import Window
key_part = Window.partitionBy("ID").orderBy(col("TF_IDF").desc())
df_key = df_join.withColumn("row",row_number().over(key_part))\
                .filter(col("row") <= 10)\
                .drop("row") \
                .groupby('ID')\
                .agg(collect_list('word_split').alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .filter(col("subtitles").isNotNull())\
                .join(df_key, on="ID", how="left")

df_new1 = df_new.select("ID", "keywords", concat_ws(" ", col("related"), col("subtitles"), col("categories")).alias("con_col"))

df_new1 = df_new1.filter((df_new1.keywords.isNotNull()) & (df_new1.con_col.isNotNull()))

df_new2 = df_new1.select("ID", "keywords", lower("con_col").alias("con_col_L"))

# #transform to list and make it distinct
df_new3 = df_new2.select("ID", "keywords", array_distinct(split("con_col_L", " ")).alias("con_split"))

# #compute the overlap
df_new4 = df_new3.select("ID", "keywords", size(array_intersect("con_split", "keywords")).alias("overlap"))

# # Compute the mean of the overlap column
mean_overlap = df_new4.agg(mean("overlap")).collect()[0][0]

# Print the mean value
print("Mean overlap:", mean_overlap)

df_new4.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("append").save("hdfs:///table_2g_demo")

:: loading settings :: url = jar:file:/home/ubuntu/spark-3.3.2-bin-without-hadoop/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e3e5623f-08af-407f-9fa4-2c9bebf19cda;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 478ms :: artifacts dl 24ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   

Mean overlap: 0.0


[Stage 30:>                                                         (0 + 1) / 1]

+----------+---------------------------------------------------------------------------------------------------------------+-------+
|ID        |keywords                                                                                                       |overlap|
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|1146534968|[waleswales, identified, inhabited, humans, years, evidenced, discovery, neanderthal, bontnewydd, palaeolithic]|0      |
+----------+---------------------------------------------------------------------------------------------------------------+-------+



                                                                                

Reduced Dataframe sizes

In [3]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
        .config("spark.executor.instances", 12) \
        .config("spark.executor.memory", "1536m") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

######fuck autoBroadcastJoinThreshold disabled!!!!!!!!!!!!

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in df from deltatable
dftest = spark.read.format("delta").load("hdfs:///table_2g").limit(2) # query table by path

# convert title and text to an array
Ctit = split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " ")

Ctext1 = dftest.select("ID", split("text", " ").alias("filt1"), Ctit.alias("Ctit"))

#filter words in title from the array in filt
Ctext2 = Ctext1.select("ID", expr("array_except(filt1, Ctit)").alias("filt2"))

#Remove empty strings
Ctext3 = Ctext2.select("ID", array_remove("filt2", "").alias("filt3"))

#Remove stopwords 
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filt4")
Ctext4 = stopwords_remover.transform(Ctext3.select("ID","filt3"))

#Count number of documents
num_doc = Ctext4.count()

#Count number of words in text of each article and create an extra column
df_tf1 = Ctext4.select("ID", "filt4", size("filt4").alias("text_len"))

#Create row for each word in text
df_ex = df_tf1.select("ID","text_len", explode("filt4").alias("word_split"))

#Find the IDFs for each word
import math
df_count = df_ex.groupBy("word_split")\
        .agg(log(num_doc/countDistinct("ID")).alias("IDF"))\
        .where(col("IDF") > math.log(num_doc/(num_doc*0.2)))        #filter the words that occur in more than 20% of the documents

#Find TF and calculate the TF-IDF
df_ex2 = df_ex.groupBy("ID", "word_split", "text_len") \
        .agg((count("word_split") / col("text_len")).alias("TF"))

#Find the IDF for every word using a join and calculate the TF-IDF
df_join = df_ex2.join(df_count, on="word_split", how="left")\
                  .select("ID", "word_split", expr("TF*IDF").alias("TF_IDF"))

#Sort the df by ID
from pyspark.sql.window import Window
key_part = Window.partitionBy("ID").orderBy(col("TF_IDF").desc())
#select the 5 keywords with the highest TF-IDF value 
df_key = df_join.withColumn("row",row_number().over(key_part))\
                .filter(col("row") <= 10)\
                .drop("row") \
                .groupby('ID')\
                .agg(collect_list('word_split').alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .join(df_key.filter(col("keywords").isNotNull()), on="ID", how="left")

df_new1 = df_new.select("ID", "keywords", concat_ws(" ", col("related"), col("subtitles"), col("categories")).alias("con_col"))

df_new1 = df_new1.filter((df_new1.keywords.isNotNull()) & (df_new1.con_col.isNotNull()))

df_new2 = df_new1.select("ID", "keywords", lower("con_col").alias("con_col_L"))

# #transform to list and make it distinct
df_new3 = df_new2.select("ID", "keywords", array_distinct(split("con_col_L", " ")).alias("con_split"))

# #compute the overlap
df_new4 = df_new3.select("ID", "keywords", size(array_intersect("con_split", "keywords")).alias("overlap"))

# # Compute the mean of the overlap column
mean_overlap = df_new4.agg(mean("overlap")).collect()[0][0]

# Print the mean value
print("Mean overlap:", mean_overlap)

df_new4.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("append").save("hdfs:///table_2g_demo")

                                                                                

Mean overlap: 0.5


[Stage 43:=>(18 + 1) / 19][Stage 44:=>(17 + 2) / 19][Stage 69:>   (0 + 1) / 1]9]

+----------+---------------------------------------------------------------------------------------------------------------+-------+
|ID        |keywords                                                                                                       |overlap|
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|1133240144|[zakraj, ek, july, ndash, september, slovene, mathematician, computer, scientist, born]                        |1      |
|1146534968|[waleswales, identified, inhabited, humans, years, evidenced, discovery, neanderthal, bontnewydd, palaeolithic]|0      |
+----------+---------------------------------------------------------------------------------------------------------------+-------+



                                                                                



In [None]:
#addtional analysis
df_new3.explain()
df_new3.rdd.getNumPartitions()

IO- optimzation: Partitioning (36) with cacheing

In [4]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext


import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in df from deltatable
##############
#optimization#
##############

dftest = spark.read.format("delta")\
         .option("numPartitions", 72) \
         .option("partitionBy", "ID") \
         .load("hdfs:///table_2g")\
         .limit(2)
           # query table by path

# convert title and text to an array
Ctit = split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " ")

Ctext1 = dftest.select("ID", split("text", " ").alias("filt1"), Ctit.alias("Ctit"))

#filter words in title from the array in filt
Ctext2 = Ctext1.select("ID", expr("array_except(filt1, Ctit)").alias("filt2"))

#Remove empty strings
Ctext3 = Ctext2.select("ID", array_remove("filt2", "").alias("filt3"))

#Remove stopwords 
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filt4")
Ctext4 = stopwords_remover.transform(Ctext3.select("ID","filt3"))

#Count number of documents
num_doc = Ctext4.count()

#Count number of words in text of each article and create an extra column
df_tf1 = Ctext4.select("ID", "filt4", size("filt4").alias("text_len"))

#Create row for each word in text
df_ex = df_tf1.select("ID","text_len", explode("filt4").alias("word_split"))

#Find the IDFs for each word
import math
df_count = df_ex.groupBy("word_split")\
        .agg(log(num_doc/countDistinct("ID")).alias("IDF"))\
        .where(col("IDF") > math.log(num_doc/(num_doc*0.2)))        #filter the words that occur in more than 20% of the documents

#Find TF and calculate the TF-IDF
df_ex2 = df_ex.groupBy("ID", "word_split", "text_len") \
        .agg((count("word_split") / col("text_len")).alias("TF"))

##############
#optimization#
##############
df_ex2.repartition(72, "word_split")\
      .cache()

#Find the IDF for every word using a join and calculate the TF-IDF
df_join = df_ex2.join(df_count, on="word_split", how="left")\
                  .select("ID", "word_split", expr("TF*IDF").alias("TF_IDF"))

##############
#optimization#
##############
# remove df_ex from cache
df_ex2.unpersist()

#Sort the df by ID
from pyspark.sql.window import Window
key_part = Window.partitionBy("ID").orderBy(col("TF_IDF").desc())
#select the 5 keywords with the highest TF-IDF value 
df_key = df_join.withColumn("row",row_number().over(key_part))\
                .filter(col("row") <= 10)\
                .drop("row") \
                .groupby('ID')\
                .agg(collect_list('word_split').alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .join(df_key.filter(col("keywords").isNotNull()), on="ID", how="left")

df_new1 = df_new.select("ID", "keywords", concat_ws(" ", col("related"), col("subtitles"), col("categories")).alias("con_col"))

df_new1 = df_new1.filter((df_new1.keywords.isNotNull()) & (df_new1.con_col.isNotNull()))

df_new2 = df_new1.select("ID", "keywords", lower("con_col").alias("con_col_L"))

# #transform to list and make it distinct
df_new3 = df_new2.select("ID", "keywords", array_distinct(split("con_col_L", " ")).alias("con_split"))

# #compute the overlap
df_new4 = df_new3.select("ID", "keywords", size(array_intersect("con_split", "keywords")).alias("overlap"))

# # Compute the mean of the overlap column
mean_overlap = df_new4.agg(mean("overlap")).collect()[0][0]

# Print the mean value
print("Mean overlap:", mean_overlap)

df_new4.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("append").save("hdfs:///table_2g_demo")

2023-05-03 16:02:43,021 ERROR cluster.YarnScheduler: Lost executor 6 on datanode2: Container from a bad node: container_1682665359283_0032_01_000007 on host: datanode2. Exit status: 137. Diagnostics: [2023-05-03 16:02:40.033]Container killed on request. Exit code is 137
[2023-05-03 16:02:40.034]Container exited with a non-zero exit code 137. 
[2023-05-03 16:02:40.035]Killed by external signal
.
                                                                                

Mean overlap: 0.5




+----------+---------------------------------------------------------------------------------------------------------------+-------+
|ID        |keywords                                                                                                       |overlap|
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|1133240144|[zakraj, ek, july, ndash, september, slovene, mathematician, computer, scientist, born]                        |1      |
|1146534968|[waleswales, identified, inhabited, humans, years, evidenced, discovery, neanderthal, bontnewydd, palaeolithic]|0      |
+----------+---------------------------------------------------------------------------------------------------------------+-------+



                                                                                



IO-optimization: Columnar compression

In [6]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

##############
#optimization#
##############
builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.driver.logLevel", "WARN")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.sources.default", "delta")\
    .config("spark.shuffle.compress", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in df from deltatable, now compressed
##############
#optimization#
##############
dftest = spark.read \
    .format("delta") \
    .option("compression", "snappy") \
    .load("hdfs:///table_2g") \
    .limit(2)


# convert title and text to an array
Ctit = split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " ")

Ctext1 = dftest.select("ID", split("text", " ").alias("filt1"), Ctit.alias("Ctit"))

#filter words in title from the array in filt
Ctext2 = Ctext1.select("ID", expr("array_except(filt1, Ctit)").alias("filt2"))

#Remove empty strings
Ctext3 = Ctext2.select("ID", array_remove("filt2", "").alias("filt3"))

#Remove stopwords 
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filt4")
Ctext4 = stopwords_remover.transform(Ctext3.select("ID","filt3"))

#Count number of documents
num_doc = Ctext4.count()

#Count number of words in text of each article and create an extra column
df_tf1 = Ctext4.select("ID", "filt4", size("filt4").alias("text_len"))

#Create row for each word in text
df_ex = df_tf1.select("ID","text_len", explode("filt4").alias("word_split"))

#Find the IDFs for each word
import math
df_count = df_ex.groupBy("word_split")\
        .agg(log(num_doc/countDistinct("ID")).alias("IDF"))\
        .where(col("IDF") > math.log(num_doc/(num_doc*0.2)))        #filter the words that occur in more than 20% of the documents

#Find TF and calculate the TF-IDF
df_ex2 = df_ex.groupBy("ID", "word_split", "text_len") \
        .agg((count("word_split") / col("text_len")).alias("TF"))

#Find the IDF for every word using a join and calculate the TF-IDF
df_join = df_ex2.join(df_count, on="word_split", how="left")\
                  .select("ID", "word_split", expr("TF*IDF").alias("TF_IDF"))

#Sort the df by ID
from pyspark.sql.window import Window
key_part = Window.partitionBy("ID").orderBy(col("TF_IDF").desc())
#select the 5 keywords with the highest TF-IDF value 
df_key = df_join.withColumn("row",row_number().over(key_part))\
                .filter(col("row") <= 10)\
                .drop("row") \
                .groupby('ID')\
                .agg(collect_list('word_split').alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .join(df_key.filter(col("keywords").isNotNull()), on="ID", how="left")

df_new1 = df_new.select("ID", "keywords", concat_ws(" ", col("related"), col("subtitles"), col("categories")).alias("con_col"))

df_new1 = df_new1.filter((df_new1.keywords.isNotNull()) & (df_new1.con_col.isNotNull()))

df_new2 = df_new1.select("ID", "keywords", lower("con_col").alias("con_col_L"))

# #transform to list and make it distinct
df_new3 = df_new2.select("ID", "keywords", array_distinct(split("con_col_L", " ")).alias("con_split"))

# #compute the overlap
df_new4 = df_new3.select("ID", "keywords", size(array_intersect("con_split", "keywords")).alias("overlap"))

# # Compute the mean of the overlap column
mean_overlap = df_new4.agg(mean("overlap")).collect()[0][0]

# Print the mean value
print("Mean overlap:", mean_overlap)

df_new4.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("append").save("hdfs:///table_2g_demo")

                                                                                

Mean overlap: 0.5




+----------+---------------------------------------------------------------------------------------------------------------+-------+
|ID        |keywords                                                                                                       |overlap|
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|1133240144|[zakraj, ek, july, ndash, september, slovene, mathematician, computer, scientist, born]                        |1      |
|1146534968|[waleswales, identified, inhabited, humans, years, evidenced, discovery, neanderthal, bontnewydd, palaeolithic]|0      |
+----------+---------------------------------------------------------------------------------------------------------------+-------+



                                                                                

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)


Map partition

In [8]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in df from deltatable
dftest = spark.read.format("delta").load("hdfs:///table_2g").limit(2)  # query table by path

# convert title and text to an array
Ctit = split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " ")

Ctext1 = dftest.select("ID", split("text", " ").alias("filt1"), Ctit.alias("Ctit"))

#filter words in title from the array in filt
Ctext2 = Ctext1.select("ID", expr("array_except(filt1, Ctit)").alias("filt2"))

#Remove empty strings
Ctext3 = Ctext2.select("ID", array_remove("filt2", "").alias("filt3"))

#Remove stopwords 
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filt4")
Ctext4 = stopwords_remover.transform(Ctext3.select("ID","filt3"))

#Count number of documents
num_doc = Ctext4.count()

#Count number of words in text of each article and create an extra column
df_tf1 = Ctext4.select("ID", "filt4", size("filt4").alias("text_len"))

#Create row for each word in text
df_ex = df_tf1.select("ID","text_len", explode("filt4").alias("word_split"))

#Find the IDFs for each word
import math
df_count = df_ex.groupBy("word_split")\
        .agg(log(num_doc/countDistinct("ID")).alias("IDF"))\
        .where(col("IDF") > math.log(num_doc/(num_doc*0.2)))        #filter the words that occur in more than 20% of the documents

#Find TF and calculate the TF-IDF
df_ex2 = df_ex.groupBy("ID", "word_split", "text_len") \
        .agg((count("word_split") / col("text_len")).alias("TF"))

#Find the IDF for every word using a join and calculate the TF-IDF
df_join = df_ex2.join(df_count, on="word_split", how="left")\
                  .select("ID", "word_split", expr("TF*IDF").alias("TF_IDF"))

#Sort the df by ID
from pyspark.sql.window import Window
key_part = Window.partitionBy("ID").orderBy(col("TF_IDF").desc())
#select the 5 keywords with the highest TF-IDF value 
df_key = df_join.withColumn("row",row_number().over(key_part))\
                .filter(col("row") <= 10)\
                .drop("row") \
                .groupby('ID')\
                .agg(collect_list('word_split').alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .join(df_key.filter(col("keywords").isNotNull()), on="ID", how="left")

df_new1 = df_new.filter((df_new.keywords.isNotNull()))

##############
#optimization#
##############

def compute_overlap(partition):
    # Define a function to compute the overlap for a single row
    def compute_row_overlap(row):
        # Compute the concatenated column
        con_col = " ".join(str(val) for val in [row.related, row.subtitles,  row.categories])
        sub_split = [word.lower().strip() for word in con_col.split() if word.isalnum()]
        overlap = len(set(sub_split).intersection(set(row.keywords)))
        return (row.ID, row.keywords, overlap)

    # Process each row in the partition using the compute_row_overlap function
    return map(compute_row_overlap, partition)

# Convert the input DataFrame to an RDD and apply the compute_overlap function
df_new2 = df_new1.rdd \
                .mapPartitions(compute_overlap)\
                .toDF(["ID", "keywords", "overlap"])

# Convert the resulting RDD to a DataFrame and compute the mean overlap
mean_overlap = df_new2.agg(mean("overlap")).collect()[0][0]

print("Mean overlap:", mean_overlap)

df_new2.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("overwrite").save("hdfs:///table_2g_extended")

                                                                                

Mean overlap: 0.5
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|ID        |keywords                                                                                                       |overlap|
+----------+---------------------------------------------------------------------------------------------------------------+-------+
|1133240144|[zakraj, ek, july, ndash, september, slovene, mathematician, computer, scientist, born]                        |1      |
|1146534968|[waleswales, identified, inhabited, humans, years, evidenced, discovery, neanderthal, bontnewydd, palaeolithic]|0      |
+----------+---------------------------------------------------------------------------------------------------------------+-------+



Using HOF

In [7]:
#start context and session
import findspark
findspark.init()
from pyspark.context import SparkContext

import pyspark
from pyspark.sql import SparkSession
from delta import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp").master('yarn') \
    .config("spark.executor.instances", 12) \
    .config("spark.executor.memory", "1536m") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# This reduces the amount of logs we recieve, only priting ERROR level or higher.
sc = spark.sparkContext
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
sc.setLogLevel('ERROR')

#Read in df from deltatable
dftest = spark.read.format("delta").load("hdfs:///table_2g").limit(2)  # query table by path

# convert title and text to an array
Ctit = split(lower(regexp_replace(dftest.title, '[^\w\s]', '')), " ")

Ctext1 = dftest.select("ID", split("text", " ").alias("filt1"), Ctit.alias("Ctit"))

#filter words in title from the array in filt
Ctext2 = Ctext1.select("ID", expr("array_except(filt1, Ctit)").alias("filt2"))

#Remove empty strings
Ctext3 = Ctext2.select("ID", array_remove("filt2", "").alias("filt3"))

#Remove stopwords 
from pyspark.ml.feature import StopWordsRemover # to remove stop words
stopwords_remover = StopWordsRemover(inputCol="filt3", outputCol="filt4")
Ctext4 = stopwords_remover.transform(Ctext3.select("ID","filt3"))

#Count number of documents
num_doc = Ctext4.count()

# Apply the transformation and create a new column with the resulting array of {word, T} arrays
expr1 = "array_distinct(filt4)"
df_tf1 = Ctext4.select("ID", expr(expr1).alias("word_dis"), size("filt4").alias("text_len"))

expr2 = "transform(word_dis, x -> named_struct('word', x, 'tf', size(filter(word_dis, y -> y = x ))/ text_len))"
df_tf2 = df_tf1.select("ID", "word_dis", "text_len", expr(expr2).alias('TF_array'))

# create a df with for every distinct word in an article a row, and calculate IDF in one pass
import math
df_count = Ctext4.selectExpr("ID", "explode(filt4) as word_split")\
        .groupBy("word_split")\
        .agg(log(num_doc/countDistinct("ID")).alias("IDF"))#\
        #.where(col("IDF") > math.log(num_doc/(num_doc*0.2)))  

lookup_map = df_count.groupBy()\
              .agg(map_from_entries(collect_set(struct("word_split", "IDF"))).alias("idf_map"))\
              .selectExpr("idf_map")\
              .collect()[0]["idf_map"]

# Define a UDF that maps words to their IDF values using a dictionary
def map_idf(lookup_dict):
    def inner_map_idf(word_dis):
        return [lookup_dict[word] for word in word_dis]
    return udf(inner_map_idf, ArrayType(DoubleType()))

# Apply the UDF to the DataFrame
df_tf3 = df_tf2.select("ID", "TF_array", map_idf(lookup_map)(col("word_dis")).alias("IDF"))

#Calculate the TF-IDF
df_tf4 = df_tf3.select("ID", expr("transform(arrays_zip(TF_array, IDF), x -> named_struct('tf_idf', x.TF_array.tf * x.IDF, 'word', x.TF_array.word ))").alias("tf_idf"))

#Sort the words according to the TF*IDF score and select the first 5 words
df_tf5 = df_tf4.select("ID", expr("sort_array(tf_idf, False)").alias("sorted"))
df_key = df_tf5.select("ID", slice(expr("transform(sorted, x -> x.word)"), 1, 10).alias("keywords"))

df_new = dftest.select("ID", "subtitles", "related", "categories")\
                .join(df_key.filter(col("keywords").isNotNull()), on="ID", how="left")

df_new1 = df_new.select("ID", "keywords", concat_ws(" ", col("related"), col("subtitles"), col("categories")).alias("con_col"))

df_new1 = df_new1.filter((df_new1.keywords.isNotNull()) & (df_new1.con_col.isNotNull()))

#transform to list and make it distinct
df_new2 = df_new1.select("ID", "keywords", array_distinct(split("con_col", " ")).alias("con_split"))

#compute the overlap
df_new3 = df_new2.select("ID", "keywords", size(array_intersect("con_split", "keywords")).alias("overlap"))

# Compute the mean of the overlap column
mean_overlap = df_new3.agg(mean("overlap")).collect()[0][0]

# Print the mean value
print("Mean overlap:", mean_overlap)

df_new3.show(truncate=False)

#write the result to a new deltatable
# df_new.write.format("delta").mode("append").save("hdfs:///table_2g_demo")

                                                                                

Mean overlap: 0.0


                                                                                

+----------+--------------------------------------------------------------------+-------+
|ID        |keywords                                                            |overlap|
+----------+--------------------------------------------------------------------+-------+
|1146534968|[youth, yl, yeoman, yeast, years, year, y, xiii, x, wrexham]        |0      |
|1133240144|[zuse, zakraj, zagreb, z, yugoslavia, wrote, went, vlo, usage, unix]|0      |
+----------+--------------------------------------------------------------------+-------+

