### **Dithering Sample code using Pandas and Numpy**

In [36]:
import numpy as np
import pandas as pd

In [39]:
epsilon = 2.0
columns = ['user','scores']
data = [('ABC', 2.0), ('XYZ', 3.0), ('PQR', 2.5)]
df = pd.DataFrame(data,columns=columns)

In [45]:
def dither_recommendations(recommendations:List[Dict],epsilon:float):
  sd = math.sqrt(math.log(epsilon)) if epsilon > 1.0 else 1e-10
  distribution = np.random.normal(0, sd, 1000)
  recommendations = recommendations.sort_values(by=['user','scores'], ascending=[True,False])
  recommendations['ditherscores'] = recommendations['scores'].map(lambda x: math.log(x+1)+np.random.choice(distribution, size=1)[0])
  recommendations = recommendations.sort_values(by=['ditherscores'],ascending = True)
  return recommendations

In [46]:
recomm = dither_recommendations(df,epsilon)

In [47]:
recomm.head()

Unnamed: 0,user,scores,ditherscores
0,ABC,2.0,0.902694
1,XYZ,3.0,1.597688
2,PQR,2.5,3.293422


### **Dithering implementation using PySpark**

In [48]:
import math
from typing import List, Dict
from pyspark.sql.functions import col
from pyspark.mllib.random import RandomRDDs
from pyspark.context import SparkContext
from pyspark.sql import SparkSession


def dither_recommendation_for_user(recommendations:List[Dict],epsilon:float):
  sd = math.sqrt(math.log(epsilon)) if epsilon > 1.0 else 1e-10
  distribution = RandomRDDs.logNormalRDD(spark.sparkContext, mean = 0, std=sd,size = 1000, seed=2)
  recom = recommendations.rdd.sortBy(lambda x: (x[0],-x[1])).zipWithIndex().map(lambda x: (x[0],math.log(x[1]+1)+distribution.sample))
  return recom


In [49]:
epsilon = 2.0
columns = ['user','scores']
data = [('ABC', 2.0), ('XYZ', 3.0), ('PQR', 2.5)]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
recommendations = rdd.toDF(columns)
recom = dither_recommendation_for_user(recommendations,epsilon)