In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [4]:
from pyspark.sql.functions import col, regexp_replace, split, lower, explode, length, size, monotonically_increasing_id

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [6]:
spark = SparkSession.builder.master("local[*]").appName("petition_analyzer").getOrCreate()

In [7]:
spark.conf.set("spark.hadoop.fs.native.io.enabled", "false")

In [8]:
def petition_top_20_word_count():
    schema = StructType([
        StructField("abstract", StructType([
            StructField("_value", StringType(), True)
        ]), True),
        StructField("label", StructType([
            StructField("_value", StringType(), True)
        ]), True),
        StructField("numberOfSignatures", IntegerType(), True)
    ])
    
    sample_inp_df = spark.read.schema(schema).json("C:\\Aviva\\input_data.json")
    
    flattened_df = sample_inp_df.select(
    col("abstract._value").alias("petition_text"),
    col("label._value").alias("petition_title"),
    col("numberOfSignatures")
    )
    clean_df = flattened_df.withColumn("petition_text", regexp_replace("petition_text", "[^A-Za-z0-9\\s]", ""))\
                           .withColumn("petition_title", regexp_replace("petition_title", "[^A-Za-z0-9\\s]", ""))\
                           .withColumn("word_list", split(regexp_replace(lower(col("petition_text")), "[^a-zA-Z0-9\\s]", ""), " "))
    
    df_word_list = clean_df.withColumn("word", explode(col("word_list")))
    word_counts = df_word_list.groupBy("word").count()
    top_20_words = word_counts.filter(length(col("word")) >= 5).orderBy(col("count").desc()).limit(20).collect()
    top_words = []
    for row in top_20_words:
        top_words.append(row[0])
    
    final_df = clean_df.select("petition_text","petition_title","numberOfSignatures")\
                       .withColumn("petitionId",monotonically_increasing_id()+1)
    
    for word in top_words:
        final_df = final_df.withColumn(word, size(split(lower(col("petition_text")), word))-1)
    
    final_df = final_df.drop("petition_text","petition_title","numberOfSignatures")
    return final_df