In [None]:
#!/usr/bin/env python3
# coding: utf-8

import findspark
import pyspark
import sys
import os

from pyspark.sql import SparkSession

if (len(sys.argv) != 2):
    print("Usage: BetterInvertedIndex.py <input folder>")
    sys.exit(1)

# Create a Spark session
spark = SparkSession.builder.master('local[*]').appName("BetterInvertedIndex").getOrCreate()
sc = spark.sparkContext

# Read input data
input_folder = sys.argv[1]
rdd = sc.wholeTextFiles(input_folder)

# Tokenize and process the data to create a better inverted index
inverted_index = rdd.flatMap(lambda doc: [((word, os.path.basename(doc[0])), 1) for word in doc[1].split()])
inverted_index = inverted_index.reduceByKey(lambda a, b: a + b)

# Reorganize the data for sorting
inverted_index = inverted_index.map(lambda item: (item[0][0], (item[0][1], item[1]))

# Group by word for sorting
inverted_index = inverted_index.groupByKey()

# Sort the list of filenames as per specified rules
inverted_index = inverted_index.mapValues(
    lambda docs: sorted(docs, key=lambda x: (-x[1], x[0]))
)

# Collect and print the better inverted index
result = inverted_index.collect()
for word, documents in result:
    print(f"{word}\t{', '.join([f'{count} {doc}' for doc, count in documents])}")
