In [1]:
!apt-get update

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!java -version

!pip install pyspark


Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Ign:13 https://developer.download.nvidia.com/compute/c

In [3]:
#Apache Spark : Genel amaçlı paralel veri işleme frameworku

#Hadoop üzerinde çalışabilir

#Büyük Veri İşleme, Büyük Veri ile Makine Öğrenmesi ===> HADOOP + Spark
from pyspark.sql.functions import *
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType,IntegerType,DateType,DoubleType
from pyspark.sql import SparkSession #DataFrame objeleri yaratmak için gerekli
from pyspark.sql.types import StructType,StructField
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml.classification import RandomForestClassifier


spark = SparkSession.builder\
.appName("Spark Dataframe Intro")\
.config("spark.sql.broadcastTimeout","100000")\
.getOrCreate()

#Spark SQL


ratings = spark.read\
              .option("inferSchema", "true")\
              .option("header", "true")\
.csv("ratings.csv")              

ratings.createOrReplaceTempView("ratings")
time_stamp_to_date_for_ratings = spark.sql("select userId, movieId, rating, date_sub(from_unixtime(cast(timestamp/1 as bigint)),0) as timestamp from ratings").createOrReplaceTempView("ratings")





ratings.printSchema() 



ratings =spark.sql("select * from ratings where limit 5000")

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [6]:
#after creating our dataframe we can pivot it for vectorizing.
rating = ratings.groupBy("userId").pivot("movieId").sum("rating").fillna(0)

In [7]:
#Now we are vecusing vecTor Assembler
vec = VectorAssembler(inputCols=rating.columns[1:],outputCol="features")
rating2 = vec.transform(rating)
rating2 = rating2.select("userId","features")


In [8]:
# for finding the distance between features we are using cross join
rating2.createOrReplaceTempView("cartesian")
rating3 = spark.sql("select a.userId as userıd1,a.features as features1,b.features as features2 ,b.userID AS userID2 from cartesian a cross join cartesian b")

In [10]:
# now we are ready to 
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from scipy.spatial import distance
distance_udf = F.udf(lambda x, y: float(distance.euclidean(x, y)), FloatType())
df2 = rating3.withColumn('distance', distance_udf(F.col('features2'),F.col('features1')))

In [12]:
# here we are using dense_rank method  finding	top 10 (sorted	by	their	similarity) most	similar	users for	each	user.
spark.sql("""select * from (select distinct userıd1,  DENSE_RANK() OVER (partition by userıd1 ORDER BY userıd1,distance asc)
  similatiry_degree,userID2 as similiar_user_of_users, distance from finaldata where distance > 0 ORDER BY userıd1,distance ) where similatiry_degree <= 10 """).show(100)

+-------+-----------------+----------------------+---------+
|userıd1|similatiry_degree|similiar_user_of_users| distance|
+-------+-----------------+----------------------+---------+
|      1|                1|                    26| 67.69786|
|      1|                2|                     8| 68.88396|
|      1|                3|                    13| 68.90573|
|      1|                4|                    31|   68.942|
|      1|                5|                     5|68.985504|
|      1|                6|                    14| 69.26759|
|      1|                7|                     3|69.336136|
|      1|                8|                    25| 69.60963|
|      1|                9|                    17|70.039276|
|      1|               10|                     9| 70.19259|
|      2|                1|                    26|26.424421|
|      2|                2|                    25| 28.87473|
|      2|                3|                     3|29.457596|
|      2|               