In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
#PYSPARK_DRIVER_PYTHON = 3.85
#PYSPARK_PYTHON = 3.85
import os
import sys
#import pyspark as spark

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Same code as shown in SimpleApp.py


In [3]:
def open_with_spark(log_file="data/movies.csv", app_name="movieAnalysis"):
    spark = SparkSession.builder.config("spark.executor.memory", "2g").config("spark.driver.memory", "2g").config("spark.storage.memoryFraction", "0.6").appName(app_name).getOrCreate()
    df = spark.read.option("header",True).csv(log_file).cache()
    return df

In [4]:
import pyspark
"""
1. Summary statistics for each relevant data frame

"""
def summary_statistics(df: pyspark.sql.dataframe.DataFrame, cols= ["*"], stats = ["count", "mean", "stddev", "min", "25%", "50%", "75%", "max"]):
    df.select(*cols).summary(*stats).show()
# 1. Do all
def summary_statistics_complete(file= "data/ratings.csv", cols= ["*"], stats = ["count", "mean", "stddev", "min", "25%", "50%", "75%", "max"]):
    summary_statistics(open_with_spark(file), cols, stats)

"""

2. Join Dataframes READABLE version, more memory needed

"""
def join_ratings_and_movies_readable():  
    df_movie = open_with_spark()
    df_ratings = open_with_spark(log_file="data/ratings.csv", app_name="ratings")
    df_join = df_ratings.join(df_movie, "movieID")
    return df_join
    
# 2. Join Dataframes
def join_ratings_and_movies(): 
    return open_with_spark().join(open_with_spark(log_file="data/ratings.csv", app_name="ratings"), "movieID")

"""
3. Most-rated movies

Returns the top N movies with the most reviews (ratings)

"""
# 3. Most-rated movies
def most_rated(df: pyspark.sql.dataframe.DataFrame, N=10):
    return df.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings").sort("Num_ratings", ascending=False).limit(N)

# 3. Function that does everything; can be used for timing or outputting or whatever
def most_rated_complete(N=10):
    JOINED = join_ratings_and_movies()
    TOP_N = most_rated(JOINED, N)
    return TOP_N

"""

4. Highest-average-rated movies

Returns the top N movies with the highest average reviews (ratings)

"""

# 4. Highest-average-rated movies
def best_average_rated(df: pyspark.sql.dataframe.DataFrame, N=10, MIN_RATINGS=50):
    T = df.groupby("title").agg(F.mean("rating"))
    H = T.withColumnRenamed("avg(rating)", "Mean_rating")
    # H.show(5)
    G = df.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings")
    C = H.join(G, "title")
    C = C.filter(C.Num_ratings >= MIN_RATINGS).select(["title", "Mean_rating"])
    J = C.sort("Mean_rating", ascending=False)
    K = J.limit(N).withColumn("Mean_rating", F.round("Mean_rating",3))
    return K

# 4. Function that does everything; can be used for timing or outputting or whatever
def best_average_rated_complete(N=10, MIN_RATINGS=50):
    JOINED = join_ratings_and_movies()
    TOP_N = best_average_rated(JOINED, N, MIN_RATINGS)
    return TOP_N 

"""

5. Popular genres: 

Find the top N popular genres by calculating the average rating for each genre.

"""

def popular_genres(df: pyspark.sql.dataframe.DataFrame, N=5, MIN_RATINGS=10):
    L = df.groupby("genres").agg(F.mean("rating"))
    L.show(5)
    M = L.withColumnRenamed("avg(rating)", "Mean_rating")
    G = df.groupby("genres").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings")
    M.show(5)
    K = M.join(G, "genres")
    P = K.filter(K.Num_ratings >= MIN_RATINGS).select(["genres", "Mean_rating"])
    N = P.sort("Mean_rating", ascending=False).limit(N).withColumn("Mean_rating", F.round("Mean_rating",3))
    return N
# 5. Retrives DF then computes Top N DF
def popular_genres_complete(N=5, MIN_RATINGS=10):
    JOINED = join_ratings_and_movies()
    TOP_N = popular_genres(JOINED, N)
    return TOP_N


In [4]:
"""

Print out number of movies and number of unqiue users.

"""

summary_statistics_complete("data/movies.csv",["title"], ["count"])
# Get df of unique users
df_unique_users = open_with_spark("data/ratings.csv").select("UserID").distinct()
summary_statistics(df_unique_users,["userID"], ["count"])

23/04/18 17:53:18 WARN Utils: Your hostname, Anthonys-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.67.104.86 instead (on interface en0)
23/04/18 17:53:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/18 17:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-------+-----+
|summary|title|
+-------+-----+
|  count|58098|
+-------+-----+



[Stage 5:>                                                          (0 + 8) / 8]

23/04/18 17:53:34 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
23/04/18 17:53:34 WARN MemoryStore: Not enough space to cache rdd_35_5 in memory! (computed 54.7 MiB so far)
23/04/18 17:53:34 WARN BlockManager: Persisting block rdd_35_5 to disk instead.
23/04/18 17:53:34 WARN MemoryStore: Not enough space to cache rdd_35_1 in memory! (computed 54.3 MiB so far)
23/04/18 17:53:34 WARN BlockManager: Persisting block rdd_35_1 to disk instead.
23/04/18 17:53:34 WARN MemoryStore: Not enough space to cache rdd_35_7 in memory! (computed 54.6 MiB so far)
23/04/18 17:53:34 WARN BlockManager: Persisting block rdd_35_7 to disk instead.
23/04/18 17:53:34 WARN MemoryStore: Not enough space to cache rdd_35_3 in memory! (computed 54.4 MiB so far)
23/04/18 17:53:34 WARN BlockManager: Persisting block r



+-------+------+
|summary|userID|
+-------+------+
|  count|283228|
+-------+------+



                                                                                

In [7]:
summary_statistics_complete("data/ratings.csv",["rating"])

23/04/17 14:41:22 WARN CacheManager: Asked to cache already cached data.
23/04/17 14:41:22 WARN MemoryStore: Not enough space to cache rdd_35_0 in memory! (computed 22.6 MiB so far)
23/04/17 14:41:22 WARN MemoryStore: Not enough space to cache rdd_35_2 in memory! (computed 35.2 MiB so far)
23/04/17 14:41:22 WARN MemoryStore: Not enough space to cache rdd_35_1 in memory! (computed 35.3 MiB so far)




+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          27753444|
|   mean|3.5304452124932677|
| stddev| 1.066352750231989|
|    min|               0.5|
|    25%|               3.0|
|    50%|               3.5|
|    75%|               4.0|
|    max|               5.0|
+-------+------------------+



                                                                                

In [6]:
import matplotlib.pyplot as plt
R = open_with_spark("data/ratings.csv").select("rating")
plt.hist(R.collect())
plt.show()

23/04/18 17:33:23 WARN CacheManager: Asked to cache already cached data.
23/04/18 17:33:23 WARN MemoryStore: Not enough space to cache rdd_35_0 in memory! (computed 22.6 MiB so far)
23/04/18 17:33:23 WARN MemoryStore: Not enough space to cache rdd_35_1 in memory! (computed 35.3 MiB so far)
23/04/18 17:33:23 WARN MemoryStore: Not enough space to cache rdd_35_6 in memory! (computed 35.4 MiB so far)


                                                                                

Py4JJavaError: An error occurred while calling o79.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.SparkPlan$$anon$1._next(SparkPlan.scala:393)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:402)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:388)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1$adapted(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$3709/0x0000000801f66cf8.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
	at org.apache.spark.sql.Dataset$$Lambda$3660/0x0000000801f2d168.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.Dataset$$Lambda$2086/0x0000000801b944f0.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset$$Lambda$1753/0x0000000801abd458.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1764/0x0000000801ac0d48.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1754/0x0000000801abd718.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeVirtual(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.LambdaForm$MH/0x0000000801160400.invoke(LambdaForm$MH)


In [5]:
# 2. Join Dataframes READABLE version, more memory needed
def join_ratings_and_movies_readable():  
    df_movie = open_with_spark()
    df_ratings = open_with_spark(log_file="data/ratings.csv", app_name="ratings")
    df_join = df_ratings.join(df_movie, "movieID")
    return df_join
    
# 2. Join Dataframes
def join_ratings_and_movies(): 
    return open_with_spark().join(open_with_spark(log_file="data/ratings.csv", app_name="ratings"), "movieID")

In [6]:
X = join_ratings_and_movies()
X.show(5)

23/04/18 18:10:41 WARN Utils: Your hostname, Anthonys-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.67.104.86 instead (on interface en0)
23/04/18 18:10:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/18 18:10:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/18 18:10:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 3:>                                                          (0 + 1) / 1]

+-------+--------------------+--------------+------+------+----------+
|movieId|               title|        genres|userId|rating| timestamp|
+-------+--------------------+--------------+------+------+----------+
|    307|Three Colors: Blu...|         Drama|     1|   3.5|1256677221|
|    481|   Kalifornia (1993)|Drama|Thriller|     1|   3.5|1256677456|
|   1091|Weekend at Bernie...|        Comedy|     1|   1.5|1256677471|
|   1257|Better Off Dead.....|Comedy|Romance|     1|   4.5|1256677460|
|   1449|Waiting for Guffm...|        Comedy|     1|   4.5|1256677264|
+-------+--------------------+--------------+------+------+----------+
only showing top 5 rows



                                                                                

23/04/18 18:10:57 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
df_updated = X.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings").sort("Num_ratings", ascending=False)
df_updated.limit(5)




23/04/13 17:40:44 WARN MemoryStore: Not enough space to cache rdd_32_1 in memory! (computed 35.3 MiB so far)
23/04/13 17:40:44 WARN BlockManager: Persisting block rdd_32_1 to disk instead.
23/04/13 17:40:50 WARN MemoryStore: Not enough space to cache rdd_32_7 in memory! (computed 54.6 MiB so far)
23/04/13 17:40:50 WARN BlockManager: Persisting block rdd_32_7 to disk instead.
23/04/13 17:40:50 WARN MemoryStore: Not enough space to cache rdd_32_2 in memory! (computed 54.4 MiB so far)
23/04/13 17:40:50 WARN BlockManager: Persisting block rdd_32_2 to disk instead.
23/04/13 17:40:50 WARN MemoryStore: Not enough space to cache rdd_32_6 in memory! (computed 54.7 MiB so far)
23/04/13 17:40:50 WARN BlockManager: Persisting block rdd_32_6 to disk instead.
23/04/13 17:40:50 WARN MemoryStore: Not enough space to cache rdd_32_4 in memory! (computed 54.6 MiB so far)
23/04/13 17:40:50 WARN BlockManager: Persisting block rdd_32_4 to disk instead.
23/04/13 17:40:51 WARN MemoryStore: Not enough space to

                                                                                

Row(title='Shawshank Redemption, The (1994)', Num_ratings=97999)

In [10]:
"""
Returns the top N movies with the most reviews (ratings)

"""
# (may be necessary for @param type)
import pyspark

# 3. Most-rated movies
def most_rated(df: pyspark.sql.dataframe.DataFrame, N=10):
    if N == None:
        return df.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings").sort("Num_ratings", ascending=False)
    else:
        return df.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings").sort("Num_ratings", ascending=False).limit(N)

# Function that does everything; can be used for timing or outputting or whatever
def most_rated_complete(N=10):
    JOINED = join_ratings_and_movies()
    TOP_N = most_rated(JOINED, N)
    return TOP_N


In [7]:
most_rated(X, 5).show(5)



+--------------------+-----------+
|               title|Num_ratings|
+--------------------+-----------+
|Shawshank Redempt...|      97999|
| Forrest Gump (1994)|      97040|
| Pulp Fiction (1994)|      92406|
|Silence of the La...|      87899|
|  Matrix, The (1999)|      84545|
+--------------------+-----------+



                                                                                

In [8]:
X.columns

['movieId', 'title', 'genres', 'userId', 'rating', 'timestamp']

In [31]:
most_rated_df = most_rated(X, 500)

In [34]:
most_rated_df.repartition(1).write.csv('results/500_most_rated.csv', header=True)

23/04/18 18:09:18 WARN MemoryStore: Not enough space to cache rdd_35_4 in memory! (computed 22.6 MiB so far)
23/04/18 18:09:18 WARN MemoryStore: Not enough space to cache rdd_35_3 in memory! (computed 22.6 MiB so far)
23/04/18 18:09:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/04/18 18:09:18 WARN MemoryStore: Not enough space to cache rdd_35_2 in memory! (computed 22.5 MiB so far)


                                                                                

In [4]:
"""
Returns the top N movies with the highest average reviews (ratings)

"""
# (may be necessary for @param type)
import pyspark

# 4. Highest-average-rated movies
def best_average_rated(df: pyspark.sql.dataframe.DataFrame, N=10, MIN_RATINGS=50):
    T = df.groupby("title").agg(F.mean("rating"))
    H = T.withColumnRenamed("avg(rating)", "Mean_rating")
    # H.show(5)
    G = df.groupby("title").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings")
    C = H.join(G, "title")
    C = C.filter(C.Num_ratings >= MIN_RATINGS).select(["title", "Mean_rating"])
    J = C.sort("Mean_rating", ascending=False)
    K = J.limit(N).withColumn("Mean_rating", F.round("Mean_rating",3))
    return K

# Function that does everything; can be used for timing or outputting or whatever
def best_average_rated_complete(N=10, MIN_RATINGS=50):
    JOINED = join_ratings_and_movies()
    TOP_N = best_average_rated(JOINED, N, MIN_RATINGS)
    return TOP_N #.select("*", round("Mean_ratings"))

In [5]:
A = best_average_rated_complete()
A.show(10)

23/04/14 16:21:07 WARN Utils: Your hostname, Coopers-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.16.108.70 instead (on interface en0)
23/04/14 16:21:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/14 16:21:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/14 16:21:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Stage 2:>                  (0 + 1) / 1][Stage 3:>                  (0 + 1) / 1]

23/04/14 16:21:18 WARN BlockManager: Block rdd_23_0 already exists on this machine; not re-adding it


[Stage 4:>                  (0 + 8) / 8][Stage 5:>                  (0 + 0) / 8]

23/04/14 16:21:45 WARN MemoryStore: Not enough space to cache rdd_37_1 in memory! (computed 54.3 MiB so far)
23/04/14 16:21:45 WARN BlockManager: Persisting block rdd_37_1 to disk instead.
23/04/14 16:21:45 WARN MemoryStore: Not enough space to cache rdd_37_2 in memory! (computed 54.4 MiB so far)
23/04/14 16:21:45 WARN BlockManager: Persisting block rdd_37_2 to disk instead.
23/04/14 16:21:45 WARN MemoryStore: Not enough space to cache rdd_37_0 in memory! (computed 54.4 MiB so far)
23/04/14 16:21:45 WARN BlockManager: Persisting block rdd_37_0 to disk instead.
23/04/14 16:21:45 WARN MemoryStore: Not enough space to cache rdd_37_4 in memory! (computed 54.6 MiB so far)
23/04/14 16:21:45 WARN BlockManager: Persisting block rdd_37_4 to disk instead.
23/04/14 16:21:45 WARN MemoryStore: Not enough space to cache rdd_37_3 in memory! (computed 54.4 MiB so far)
23/04/14 16:21:45 WARN BlockManager: Persisting block rdd_37_3 to disk instead.
23/04/14 16:21:46 WARN MemoryStore: Not enough space to

[Stage 4:==>                (1 + 7) / 8][Stage 5:>                  (0 + 1) / 8]

23/04/14 16:21:56 WARN MemoryStore: Not enough space to cache rdd_37_0 in memory! (computed 13.0 MiB so far)




23/04/14 16:21:57 WARN MemoryStore: Not enough space to cache rdd_37_1 in memory! (computed 54.3 MiB so far)
23/04/14 16:21:57 WARN MemoryStore: Not enough space to cache rdd_37_2 in memory! (computed 54.4 MiB so far)




23/04/14 16:21:59 WARN MemoryStore: Not enough space to cache rdd_37_7 in memory! (computed 22.6 MiB so far)
23/04/14 16:21:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/04/14 16:21:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.




+--------------------+-----------+
|               title|Mean_rating|
+--------------------+-----------+
|Planet Earth II (...|      4.487|
| Planet Earth (2006)|      4.458|
|Shawshank Redempt...|      4.424|
|Band of Brothers ...|        4.4|
|Black Mirror: Whi...|      4.351|
|              Cosmos|      4.344|
|The Godfather Tri...|       4.34|
|Godfather, The (1...|      4.333|
|Usual Suspects, T...|      4.292|
|        Black Mirror|      4.264|
+--------------------+-----------+



                                                                                

In [14]:
"""
5. Popular genres: Find the top N popular genres by calculating the average rating for each genre.
"""
import pyspark
def popular_genres(df: pyspark.sql.dataframe.DataFrame, N=5, MIN_RATINGS=10):
    L = df.groupby("genres").agg(F.mean("rating"))
    L.show(5)
    M = L.withColumnRenamed("avg(rating)", "Mean_rating")
    G = df.groupby("genres").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings")
    M.show(5)
    K = M.join(G, "genres")
    P = K.filter(K.Num_ratings >= MIN_RATINGS).select(["genres", "Mean_rating"])
    N = P.sort("Mean_rating", ascending=False).limit(N).withColumn("Mean_rating", F.round("Mean_rating",3))
    return N
def popular_genres_complete(N=5, MIN_RATINGS=10):
    JOINED = join_ratings_and_movies()
    TOP_N = popular_genres(JOINED, N)
    return TOP_N

In [13]:
W = popular_genres_complete(10)
W.show(10)


23/04/16 18:02:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/04/16 18:02:52 WARN CacheManager: Asked to cache already cached data.
23/04/16 18:02:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/04/16 18:02:53 WARN CacheManager: Asked to cache already cached data.
23/04/16 18:02:53 WARN MemoryStore: Not enough space to cache rdd_32_0 in memory! (computed 22.6 MiB so far)
23/04/16 18:02:53 WARN MemoryStore: Not enough space to cache rdd_32_2 in memory! (computed 22.5 MiB so far)
23/04/16 18:02:53 WARN MemoryStore: Not enough space to cache rdd_32_1 in memory! (computed 35.3 MiB so far)


                                                                                

+--------------------+------------------+
|              genres|       avg(rating)|
+--------------------+------------------+
|Comedy|Horror|Thr...| 3.288320727995902|
|Adventure|Sci-Fi|...| 3.212121212121212|
|Action|Adventure|...| 4.011721534573262|
| Action|Drama|Horror|3.7695176529090006|
|Action|Animation|...|  3.76522506619594|
+--------------------+------------------+
only showing top 5 rows

23/04/16 18:02:57 WARN MemoryStore: Not enough space to cache rdd_32_1 in memory! (computed 22.5 MiB so far)
23/04/16 18:02:57 WARN MemoryStore: Not enough space to cache rdd_32_0 in memory! (computed 22.6 MiB so far)
23/04/16 18:02:57 WARN MemoryStore: Not enough space to cache rdd_32_2 in memory! (computed 35.2 MiB so far)


                                                                                

+--------------------+------------------+
|              genres|       Mean_rating|
+--------------------+------------------+
|Comedy|Horror|Thr...| 3.288320727995902|
|Adventure|Sci-Fi|...| 3.212121212121212|
|Action|Adventure|...| 4.011721534573262|
| Action|Drama|Horror|3.7695176529090006|
|Action|Animation|...|  3.76522506619594|
+--------------------+------------------+
only showing top 5 rows

23/04/16 18:03:02 WARN MemoryStore: Not enough space to cache rdd_32_0 in memory! (computed 22.6 MiB so far)
23/04/16 18:03:02 WARN MemoryStore: Not enough space to cache rdd_32_1 in memory! (computed 22.5 MiB so far)
23/04/16 18:03:02 WARN MemoryStore: Not enough space to cache rdd_32_2 in memory! (computed 35.2 MiB so far)




23/04/16 18:03:04 WARN MemoryStore: Not enough space to cache rdd_32_1 in memory! (computed 22.5 MiB so far)
23/04/16 18:03:04 WARN MemoryStore: Not enough space to cache rdd_32_0 in memory! (computed 35.3 MiB so far)
23/04/16 18:03:04 WARN MemoryStore: Not enough space to cache rdd_32_2 in memory! (computed 22.5 MiB so far)


                                                                                

+--------------------+-----------+
|              genres|Mean_rating|
+--------------------+-----------+
|Action|Adventure|...|      4.201|
|Film-Noir|Romance...|      4.164|
|Action|Crime|Dram...|      4.163|
|Action|Adventure|...|      4.157|
|Action|Crime|Dram...|      4.156|
|Adventure|Animati...|      4.152|
|Animation|Childre...|      4.145|
|   Film-Noir|Mystery|      4.128|
|Crime|Film-Noir|M...|      4.127|
|Action|Adventure|...|       4.12|
+--------------------+-----------+



In [5]:
"""
6. Year-wise analysis: 

Extract the release year from the movie title and analyze the number of movies released and their average ratings per year.

"""

# code needs some work filtering out bad years, works in general but some title must not end with the year in ( )
def year_analysis(df: pyspark.sql.dataframe.DataFrame):
    # add year column
    J = df.withColumn("year", F.col("title").substr(F.length("title")-4, F.length("title")).substr(1,4)).select("movieID", "year")
    K = J.join(open_with_spark("data/ratings.csv").select("movieID", "rating"), "movieID")
    #K.show(200)
    # get average rating per year
    #D = K.groupby("year").agg(F.mean("rating")).withColumnRenamed("avg(rating)", "Mean_rating").sort("Mean_rating", ascending = False)
    K = K.groupby("year").agg(F.count("rating")).withColumnRenamed("count(rating)", "Num_ratings").sort("Num_ratings", ascending=False)
    return  K #K.join(D, "year")
def year_analysis_complete():
    X = open_with_spark()
    Y = year_analysis(X)
    return Y

In [7]:
A = year_analysis_complete()
A.show(1000)

23/04/17 16:39:58 WARN CacheManager: Asked to cache already cached data.
23/04/17 16:39:59 WARN CacheManager: Asked to cache already cached data.
23/04/17 16:39:59 WARN MemoryStore: Not enough space to cache rdd_37_2 in memory! (computed 22.5 MiB so far)
23/04/17 16:39:59 WARN MemoryStore: Not enough space to cache rdd_37_1 in memory! (computed 22.5 MiB so far)
23/04/17 16:39:59 WARN MemoryStore: Not enough space to cache rdd_37_0 in memory! (computed 35.3 MiB so far)




+------+-----------+
|  year|Num_ratings|
+------+-----------+
|  1995|    1767979|
|  1994|    1529657|
|  1996|    1334834|
|  1999|    1305889|
|  2000|    1089609|
|  1993|    1082362|
|  1997|    1080028|
|  1998|    1040732|
|  2001|     983000|
|  2002|     885726|
|  2004|     849291|
|  2003|     779270|
|  2006|     617677|
|  2007|     579220|
|  2005|     570873|
|  1992|     567142|
|  1989|     558507|
|  1990|     548939|
|  2008|     547974|
|  2009|     507131|
|  1991|     486961|
|  2010|     456500|
|  1988|     428412|
|  1987|     424588|
|  1986|     422299|
|  1984|     403551|
|  2011|     372899|
|  2012|     362019|
|  2014|     356580|
|  1985|     348500|
|  2013|     336980|
|  1982|     303667|
|  2015|     278900|
|  1980|     270448|
|  1983|     245945|
|  1981|     245589|
|  1979|     220638|
|  2016|     204613|
|  1975|     182415|
|  1977|     170660|
|  1971|     153949|
|  1978|     136967|
|  1973|     132878|
|  1974|     130366|
|  1968|     

                                                                                