In [1]:
from pyspark import SparkConf, SparkContext
import csv

def extract_year(date):
    return date.split("-")[0]

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("IMDB")
    sc = SparkContext(conf=conf)
    
    #Predlog 4
    #Najzastupljeniji žanrovi. Da li postoji razlika po decenijama ili nekim drugim vremenskim periodima
    #Broj filmova po dekadama, za zanr sa najvise i najmanje snimljenih filmova
    
    mymoviedb = sc.textFile("mymoviedb.csv")
    header = mymoviedb.first()
    
    dataWithoutHeader = mymoviedb.filter(lambda line: line != header)
    
    # Extract year and genres
    data = dataWithoutHeader.mapPartitions(lambda x: csv.reader(x)).map(lambda x: (extract_year(x[0]),x[7]))

    # Split genres and count the occurrences
    genre_per_decade = data.flatMap(lambda x: [(genre.strip(), x[0][:3] + "0s") for genre in x[1].split(",")])
    genre_counts = data.map(lambda x: x[1]).flatMap(lambda genres: genres.split(",")).map(lambda genre: (genre.strip(), 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=True)
    
    # Find the genre with the maximum count
    max_tuple = genre_counts.max(key=lambda x: x[1])
    
    # Find the genre with the minimum count
    min_tuple = genre_counts.take(1)
    
    filtered_min_tuple = genre_per_decade.filter(lambda x: x[0] == min_tuple[0][0])
    filtered_max_tuple = genre_per_decade.filter(lambda x: x[0] == max_tuple[0])
    
    # Count the occurrences of the min count genre by decade
    min_genre_counts_by_decade = filtered_min_tuple.map(lambda x: ((x[0], x[1]), 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .sortByKey()
    
    # Count the occurrences of the maximum count genre by decade
    max_genre_counts_by_decade = filtered_max_tuple.map(lambda x: ((x[0], x[1]), 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .sortByKey()
    
    results = min_genre_counts_by_decade.collect()
    # Print the results
    output_path = "number_of_movies_for_min_genre_output"
    min_genre_counts_by_decade.saveAsTextFile(output_path)
     
    
    results = max_genre_counts_by_decade.collect()
    output_path = "number_of_movies_for_max_genre_output"
    max_genre_counts_by_decade.saveAsTextFile(output_path)
    
    # Stop the Spark context
    sc.stop()
