# Setting up Pyspark environment

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz 
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark


# Initializing Spark session

In [2]:
from pyspark.sql import SparkSession, functions as f
spark = SparkSession.builder.appName("Movies_Metrics").getOrCreate()


# Getting data from raw files 


In [3]:
data = "https://raw.githubusercontent.com/justmarkham/pandas-videos/master/data/u.data"
items = "https://raw.githubusercontent.com/justmarkham/pandas-videos/master/data/u.item"
users = "https://raw.githubusercontent.com/justmarkham/pandas-videos/master/data/u.user"

# Consuming data with Pandas library

In [4]:
import pandas as pd
from pandas._libs import index
df_data = pd.read_csv(data, sep="\t", names=["userID", "itemID", "rating", "timestamp"])
df_items = pd.read_csv(items, sep="|", 
                       usecols=range(3), names=["movieID", "movieTitle", "releaseDate"]).astype("str")
df_users= pd.read_csv(users, sep="|", names=["userID", "age", "gender", "occupation", "zipCode"])


# Transforming Panas DFs into Spark DFs 

In [5]:
spark_udata = spark.createDataFrame(df_data)
spark_item = spark.createDataFrame(df_items)
spark_users = spark.createDataFrame(df_users)

# METRICS




### Finding the 3 Users who rated more movies


In [6]:
subdf = spark_udata.groupBy("userID").agg(f.count("*").alias("amount"))

In [None]:
subdf.sort("amount", ascending=False).show(3)

### Finding the top 3 oldest movies

In [None]:
dtf = spark_item.select("movieTitle",f.to_date(spark_item["releaseDate"], "d-MMM-yyyy").alias("releaseDate"))

In [None]:
dtf.sort("releaseDate",ascending=True).filter(dtf.releaseDate.isNotNull()).show(3)

### Finding the details of the user who rated more movies 




In [None]:
the_user = spark_udata.groupBy("userID").agg(f.count("*").alias("cuentas")).sort("cuentas", ascending=False).limit(1)

In [None]:
the_user.join(spark_users, "userID").show()

### Finding the top 3 movie which was the most rated

In [18]:
the_item = spark_udata.groupBy("itemID").agg(f.count("*").alias("amount of ratings")).sort("amount of ratings", ascending=False).limit(3)

In [None]:
the_item.join(spark_item,the_item.itemID == spark_item.movieID).show()

### Finding amount of people by gender

In [None]:
spark_users.groupBy("gender").count().orderBy("count").show()

### The most common occupations among women

In [10]:
ocp_stats = spark_users.filter(f.lower(spark_users.gender) == 'f').groupBy("occupation").count().orderBy("count", ascending=False)

In [None]:
ocp_stats.withColumn("occupations", ocp_stats.occupation).show()

### Finding the most common occupations among men


In [16]:
ocp_stats = spark_users.filter(f.lower(spark_users.gender) == 'm').groupBy("occupation").count().orderBy("count", ascending=False)

In [None]:
ocp_stats.withColumn("occupations", ocp_stats.occupation).show()

### Using Broadcast variable as a look-up table and defining an UDF to get the movie names

In [28]:
most_popular = [{'movieID': 50, 'movieTitle': 'Star Wars (1977)'},{'movieID': 56, 'movieTitle': 'Pulp Fiction (1994)'}, ]

In [29]:
my_var = spark.sparkContext.broadcast(most_popular)

In [30]:
results = spark_udata.groupBy('itemID').count().withColumnRenamed('count', 'total').sort('total', ascending=False).limit(20)

In [31]:
def my_func(idx):
  for i in my_var.value:
    if i['movieID'] == idx:
      return i['movieTitle']
  return None


In [32]:
lookup = f.udf(my_func)

In [None]:
results.select('itemID', 'total', lookup(f.col('itemID'))).show()