In [1]:

import os, sys

os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 --repositories https://repos.spark-packages.org/ pyspark-shell '
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
os.environ["PYSPARK_PYTHON"] = 'python3'
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.4.0-315
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

In [3]:
k = 10
w_score = [3.291, u'99,90%']

z = 3.291


In [4]:

schema = StructType([ \
    StructField("id_rating",IntegerType(),True), \
    StructField("film_id",StringType(),True), \
    StructField("Rating",IntegerType(),True), \
    StructField("Userid", IntegerType(), True)
  ])

In [5]:
df = spark.read.csv(
    'hdfs:/labs/lab05data/ml-100k/u.data',
    schema=schema, header=False, sep='\t')

In [6]:
from pyspark.sql import functions as F

In [8]:
films_group = df.groupBy('film_id').agg(F.count('film_id').alias('p1'),F.sum('Rating').alias('rs')) \
    .withColumn('p2',(F.col('rs')/F.col('p1')))

In [9]:
film_pos = df.filter("Rating>3").groupBy('film_id').count().withColumnRenamed("count","p3")

In [10]:
film_pos = film_pos.join(films_group,'film_id','inner').withColumn('p4',(F.col('p3')/F.col('p1')))

In [13]:
from pyspark.sql.functions import mean

In [14]:
Mu = df.select(mean('Rating')).collect()[0][0]

In [15]:
films_group = df.groupBy('film_id').agg(F.count('film_id').alias('p1'),F.sum('Rating').alias('rs')) \
    .withColumn('p6',((F.col('rs')+k*Mu)/(F.col('p1')+k))).drop("p1","rs")

In [16]:
film_pos = film_pos.join(films_group,"film_id","inner")

In [19]:
from pyspark.sql.functions import sqrt

In [45]:
#  
#(phat + z*z/(2*n) - z * sqrt(    (phat* (1-phat) +z*z/(4*n))/n          )  )  
#/  (1+z*z/n)            
z2 = z*z


wils = film_pos.withColumn("wilson_part3", ( \
        ( 1+z2/F.col("p1")
        )))

wils = wils.withColumn("wilson_part2", ( \
        ( F.col("p4")*(1-F.col("p4"))+z2/(4*F.col("p1")))/F.col("p1") ))



In [46]:
wils = wils.withColumn("wilson_lower", ( \
       ( F.col("p4")+z2/(2*F.col("p1")) - z *sqrt( F.col("wilson_part2")) ) / F.col("wilson_part3"))) \
        .drop("wilson_part3","wilson_part2")

In [29]:
fi = spark.read.csv(
    'hdfs:/labs/lab05data/ml-100k/u.item',
     header=False, sep='|')
fi = fi.select("_c0","_c1").withColumnRenamed("_c0","film_id").withColumnRenamed("_c1","film_name")

In [30]:
wils = wils.join(fi,"film_id").drop("wilsom_part2")

In [32]:
result = {"top10_rates":[], "top10_average":[], "top10_rating":[], "top10_lower":[]}

In [38]:
result["top10_rates"] = [int(row['film_id']) for row in wils.sort(F.col("p1").desc(),F.col("film_name").asc()).select("film_id").limit(10).collect()]

In [34]:
result["top10_average"] = [int(row['film_id']) for row in wils.sort(F.col("p2").desc(),F.col("film_name").asc()).select("film_id").limit(10).collect()]

In [39]:
result["top10_rating"] = [int(row['film_id']) for row in wils.sort(F.col("p6").desc(),F.col("film_name").asc()).select("film_id").limit(10).collect()]

In [36]:
result["top10_lower"] = [int(row['film_id']) for row in wils.sort(F.col("wilson_lower").desc(),F.col("film_name").asc()).select("film_id").limit(10).collect()]

In [40]:
result

{'top10_rates': [50, 258, 100, 181, 294, 286, 288, 1, 300, 121],
 'top10_average': [1536, 1653, 814, 1201, 1189, 1467, 1500, 1599, 1293, 1122],
 'top10_rating': [318, 483, 64, 408, 169, 12, 603, 50, 114, 178],
 'top10_lower': [64, 98, 318, 479, 50, 483, 603, 427, 357, 12]}

In [41]:
import json
with open("lab05s.json", "w") as outfile:
    json.dump(result, outfile)