In [183]:
!pip install pyspark



In [184]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

spark = SparkSession.builder.appName("BigData - Project 1").getOrCreate()

In [185]:
import codecs
from pyspark.sql import functions as func

def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("/content/drive/MyDrive/BIGDATA/Data/ml-100k/u.item", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

Funkcija za dodavanje imena filma, na osnovu movieID-ja

In [186]:
nameDict = spark.sparkContext.broadcast(loadMovieNames())
def lookupName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

Učitavanje baze sa kolonama(userID, movieID, rating i timetsamp), korišćenjem adekvatne šeme

In [187]:
from pyspark.sql.types import IntegerType, StructType, StructField, LongType, StringType
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u.data')

In [188]:
moviesDF.show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
|   298|    474|     4|884182806|
|   115|    265|     2|881171488|
|   253|    465|     5|891628467|
|   305|    451|     3|886324817|
|     6|     86|     3|883603013|
|    62|    257|     2|879372434|
|   286|   1014|     5|879781125|
|   200|    222|     5|876042340|
|   210|     40|     3|891035994|
|   224|     29|     3|888104457|
|   303|    785|     3|879485318|
|   122|    387|     5|879270459|
|   194|    274|     2|879539794|
|   291|   1042|     4|874834944|
|   234|   1184|     2|892079237|
+------+-------+------+---------+
only showing top 20 rows



# **Najpopularniji filmovi**

**50 Najpopularnijih filmova**

Kako bismo dobili samo 50 najpopularnijih filmova, prvo grupisemo sve instance na osnovu movieID-ja i izbrojimo redove za svaki ID. Postavimo da kolona count treba da ide opadajuće, a sa limit biramo samo prvih 50 redova.

In [189]:
movieCounts = moviesDF.groupBy("movieID").count().orderBy('count',ascending = False)

moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

top50 = moviesWithNames.limit(50)
top50.show(50)

+-------+-----+--------------------+
|movieID|count|          movieTitle|
+-------+-----+--------------------+
|     50|  583|    Star Wars (1977)|
|    258|  509|      Contact (1997)|
|    100|  508|        Fargo (1996)|
|    181|  507|Return of the Jed...|
|    294|  485|    Liar Liar (1997)|
|    286|  481|English Patient, ...|
|    288|  478|       Scream (1996)|
|      1|  452|    Toy Story (1995)|
|    300|  431|Air Force One (1997)|
|    121|  429|Independence Day ...|
|    174|  420|Raiders of the Lo...|
|    127|  413|Godfather, The (1...|
|     56|  394| Pulp Fiction (1994)|
|      7|  392|Twelve Monkeys (1...|
|     98|  390|Silence of the La...|
|    237|  384|Jerry Maguire (1996)|
|    117|  378|    Rock, The (1996)|
|    172|  367|Empire Strikes Ba...|
|    222|  365|Star Trek: First ...|
|    204|  350|Back to the Futur...|
|    313|  350|      Titanic (1997)|
|    405|  344|Mission: Impossib...|
|     79|  336|Fugitive, The (1993)|
|    210|  331|Indiana Jones and...|
|

**10 filmova sa najvise i 10 sa najmanje pregleda**

Radi se na sličan način kao predhodni primer, grupisemo po movieID-ju i izbrojimo instance za svaku grupu. Za filmove sa najviše pregleda se samo postavi da po koloni Most popular ide opadajuće, a za 10 filmova za najmanje pregleda rastuće.

In [190]:
moviesCountAvg = moviesDF.groupBy("movieID").agg(func.avg('rating').alias('Avg_rating'),func.count('movieID').alias('Most_popular'))

moviesWithNames = moviesCountAvg.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

moviesCountAvgAsc = moviesWithNames.orderBy('Most_popular')

moviesCountAvgAsc.show(10, False)

+-------+----------+------------+---------------------------------------------------------+
|movieID|Avg_rating|Most_popular|movieTitle                                               |
+-------+----------+------------+---------------------------------------------------------+
|1352   |1.0       |1           |Shadow of Angels (Schatten der Engel) (1976)             |
|1650   |4.0       |1           |Butcher Boy, The (1998)                                  |
|1525   |4.0       |1           |Object of My Affection, The (1998)                       |
|1339   |1.0       |1           |Stefano Quantestorie (1993)                              |
|1561   |1.0       |1           |Tigrero: A Film That Was Never Made (1994)               |
|1618   |1.0       |1           |King of New York (1990)                                  |
|1645   |4.0       |1           |Butcher Boy, The (1998)                                  |
|857    |3.0       |1           |Paris Was a Woman (1995)                       

In [191]:
moviesCountAvgDesc= moviesWithNames.orderBy('Most_popular',ascending=False)

moviesCountAvgDesc.show(10, False)

+-------+------------------+------------+-----------------------------+
|movieID|Avg_rating        |Most_popular|movieTitle                   |
+-------+------------------+------------+-----------------------------+
|50     |4.3584905660377355|583         |Star Wars (1977)             |
|258    |3.8035363457760316|509         |Contact (1997)               |
|100    |4.155511811023622 |508         |Fargo (1996)                 |
|181    |4.007889546351085 |507         |Return of the Jedi (1983)    |
|294    |3.156701030927835 |485         |Liar Liar (1997)             |
|286    |3.656964656964657 |481         |English Patient, The (1996)  |
|288    |3.4414225941422596|478         |Scream (1996)                |
|1      |3.8783185840707963|452         |Toy Story (1995)             |
|300    |3.6310904872389793|431         |Air Force One (1997)         |
|121    |3.438228438228438 |429         |Independence Day (ID4) (1996)|
+-------+------------------+------------+-----------------------

**Izracunavanje skor-a i prikaz 5 najboljih filmova na osnovu skor-a**

Kako bismo izračunali formulu za dati skor, prvo smo izdvojili vrednosti maxCount i maxAvg. A potop sa withColumn napravili novu kolonu Score, u okviru koje će se nalaziti vrednosti dobijene formulom (broj_pregleda*prosečna_ocena)/(max(broj_pregleda)*max(prosečna ocena)).

In [192]:
maxCount = moviesWithNames.agg(func.max('Most_popular')).first().asDict()['max(Most_popular)']
maxAvg = moviesWithNames.agg(func.max('Avg_rating')).first().asDict()['max(Avg_rating)']

In [193]:
maxCount

583

In [194]:
moviesWithScore = moviesWithNames.withColumn\
 ('Score',(moviesWithNames.Most_popular*moviesWithNames.Avg_rating)/(maxCount*maxAvg))

moviesWithScore.orderBy('Score',ascending=False).show(5)

+-------+------------------+------------+--------------------+------------------+
|movieID|        Avg_rating|Most_popular|          movieTitle|             Score|
+-------+------------------+------------+--------------------+------------------+
|     50|4.3584905660377355|         583|    Star Wars (1977)|0.8716981132075472|
|    100| 4.155511811023622|         508|        Fargo (1996)|0.7241852487135506|
|    181| 4.007889546351085|         507|Return of the Jed...|0.6970840480274443|
|    258|3.8035363457760316|         509|      Contact (1997)|0.6641509433962264|
|    174| 4.252380952380952|         420|Raiders of the Lo...|0.6126929674099485|
+-------+------------------+------------+--------------------+------------------+
only showing top 5 rows



**5 najpopularnijih po polu filmova**

In [195]:
schemaDemo = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("age", IntegerType(), True), \
                     StructField("gender", StringType(), True), \
                     StructField("occupation", StringType(), True)],)

# Load up movie data as dataframe
demographics = spark.read.option("sep", "|").schema(schemaDemo).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u.user')

Ovde smo morali da učitamo novu tabelu kako bismo ispunili zahtev, u okviru ove tabele(demographics) nalaze se informacije o korisnicima koji ocenjuju filmove.

In [196]:
demographics.show()

+------+---+------+-------------+
|userID|age|gender|   occupation|
+------+---+------+-------------+
|     1| 24|     M|   technician|
|     2| 53|     F|        other|
|     3| 23|     M|       writer|
|     4| 24|     M|   technician|
|     5| 33|     F|        other|
|     6| 42|     M|    executive|
|     7| 57|     M|administrator|
|     8| 36|     M|administrator|
|     9| 29|     M|      student|
|    10| 53|     M|       lawyer|
|    11| 39|     F|        other|
|    12| 28|     F|        other|
|    13| 47|     M|     educator|
|    14| 45|     M|    scientist|
|    15| 49|     F|     educator|
|    16| 21|     M|entertainment|
|    17| 30|     M|   programmer|
|    18| 35|     F|        other|
|    19| 40|     M|    librarian|
|    20| 42|     F|    homemaker|
+------+---+------+-------------+
only showing top 20 rows



Spajamo tabele moviesDemo u okviru kojih se nalaze informacije o filmovi sa ocenom koji je odredjeni korisnik dao(na osnovu userID-ja), spajamo sa bazom o informacijama o korisniku.
Kako bismo dobili najpopularnije filmove za svaki pol novodobijenu tabelu postavimo uslov u where-u pza pol(prvo stavimo jedan pol, pa onda drugi), a potop grupisemo po filmu i izbrojimo instance za svaki film. Kkako bismo dobili top 5 filmova na osnovu kolone count postavimo da se filmovi prikazu opadajuće.

In [197]:
moviesDemo = moviesDF.withColumn("Title", lookupNameUDF(func.col("movieID")))
moviesDemo = moviesDemo.join(demographics,'userID', 'inner')

moviesDemo.select('Title').where(moviesDemo.gender == 'M').groupBy('Title').agg(func.count('Title').alias('count')).orderBy('count',ascending=False).show(5)

+--------------------+-----+
|               Title|count|
+--------------------+-----+
|    Star Wars (1977)|  432|
|        Fargo (1996)|  383|
|Return of the Jed...|  383|
|      Contact (1997)|  372|
|    Liar Liar (1997)|  344|
+--------------------+-----+
only showing top 5 rows



In [198]:
moviesDemo.select('Title').where(moviesDemo.gender == 'F').groupBy('Title').agg(func.count('Title').alias('count')).orderBy('count',ascending=False).show(5)

+--------------------+-----+
|               Title|count|
+--------------------+-----+
|English Patient, ...|  152|
|    Star Wars (1977)|  151|
|       Scream (1996)|  143|
|    Liar Liar (1997)|  141|
|      Contact (1997)|  137|
+--------------------+-----+
only showing top 5 rows



In [199]:
moviesDemo.show(5)

+------+-------+------+---------+--------------------+---+------+----------+
|userID|movieID|rating|timestamp|               Title|age|gender|occupation|
+------+-------+------+---------+--------------------+---+------+----------+
|   196|    242|     3|881250949|        Kolya (1996)| 49|     M|    writer|
|   186|    302|     3|891717742|L.A. Confidential...| 39|     F| executive|
|    22|    377|     1|878887116| Heavyweights (1994)| 25|     M|    writer|
|   244|     51|     2|880606923|Legends of the Fa...| 28|     M|technician|
|   166|    346|     1|886397596| Jackie Brown (1997)| 47|     M|  educator|
+------+-------+------+---------+--------------------+---+------+----------+
only showing top 5 rows



**3 najpopularnija filma po polu i zanimanju korisnika.**

Za ovaj zahtev napravljena je posebna funkcija showOccupation. Ona u uslov stavlja zadat pol i zanimanje(kako bi se za racunanje koristile samo adekvatne instance), a potop grupiše po nazivu filma i broji instance za svaki film. Kkao bi se prikazala samo top 3 filma, stavljamo da se filmovi prikazuju u opadajućem redosledu.

In [200]:
occupations = moviesDemo.select('occupation').distinct();
occupations = [row.occupation for row in occupations.collect()]

In [201]:
def showOccupation(occ,sex):
  moviesDemo.select('Title', 'occupation').where(moviesDemo.gender == sex).where(moviesDemo.occupation==occ)\
  .groupBy('Title','occupation').agg(func.count('Title').alias('count'))\
  .orderBy('count',ascending=False).show(3)

In [202]:
for occupation in occupations:
  showOccupation(occupation,"F")

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|English Patient, ...| librarian|   21|
|        Fargo (1996)| librarian|   17|
|    Star Wars (1977)| librarian|   17|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|Ice Storm, The (1...|   retired|    2|
|         Dave (1993)|   retired|    1|
|  Ulee's Gold (1997)|   retired|    1|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|      Volcano (1997)|    lawyer|    2|
|English Patient, ...|    lawyer|    2|
|Tomorrow Never Di...|    lawyer|    2|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+----

In [203]:
for occupation in occupations:
  showOccupation(occupation,"M")

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|English Patient, ...| librarian|   14|
|      Titanic (1997)| librarian|   13|
|  Chasing Amy (1997)| librarian|   11|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|        Fargo (1996)|   retired|    9|
|Jerry Maguire (1996)|   retired|    8|
|    Liar Liar (1997)|   retired|    8|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+--------------------+----------+-----+
|Full Monty, The (...|    lawyer|    8|
|English Patient, ...|    lawyer|    7|
|    Star Wars (1977)|    lawyer|    7|
+--------------------+----------+-----+
only showing top 3 rows

+--------------------+----------+-----+
|               Title|occupation|count|
+----

**3 najpopularnija filma u svakom žanru.**

Ponovo se učitava potrebna baza, a zatim samo izvuku iz nje jedinstvena imena žanrova.

In [204]:
schemaGenre= StructType([StructField("genre", StringType(), True)],)

genre = spark.read.schema(schemaGenre).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u.genre')

genres = genre.select('genre').distinct();
genres = [genre.genre.split('|')[0] for genre in genres.collect()]
genres

['Crime',
 'Action',
 'Drama',
 'Western',
 'Horror',
 'unknown',
 'Animation',
 'Mystery',
 'Thriller',
 'Documentary',
 'War',
 'Sci-Fi',
 'Film-Noir',
 'Musical',
 'Comedy',
 'Romance',
 'Adventure',
 'Fantasy',
 "Children's"]

Učitavamo i tabelu moviesInfo u okviru koje su podaci vezani za film u smislu kom žanru film pripada, kao i datum premijera filma.

In [205]:
# Load up movie data as dataframe

schemaInfo = StructType([ \
                     StructField("movieID", IntegerType(), True), \
                     StructField("Title", StringType(), True), \
                     StructField("release date", StringType(), True), \
                     StructField("timestamp", IntegerType(), True),\
                      StructField("imdb", StringType(), True), \
                      StructField("unknown", IntegerType(), True), \
                      StructField("Action", IntegerType(), True), \
                      StructField("Adventure", IntegerType(), True), \
                      StructField("Animation", IntegerType(), True), \
                      StructField("Children's", IntegerType(), True), \
                      StructField("Comedy", IntegerType(), True), \
                      StructField("Crime", IntegerType(), True), \
                      StructField("Documentary", IntegerType(), True), \
                      StructField("Drama", IntegerType(), True), \
                      StructField("Fantasy", IntegerType(), True), \
                      StructField("Film-Noir", IntegerType(), True), \
                      StructField("Horror", IntegerType(), True), \
                      StructField("Musical", IntegerType(), True), \
                      StructField("Mystery", IntegerType(), True), \
                      StructField("Romance", IntegerType(), True), \
                      StructField("Sci-Fi", IntegerType(), True), \
                      StructField("Thriller", IntegerType(), True), \
                      StructField("War", IntegerType(), True), \
                      StructField("Western", IntegerType(), True)],)

moviesInfo = spark.read.option("sep","|").option("inferSchema", "true")\
    .schema(schemaInfo).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u.item')
moviesInfo.show()

+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movieID|               Title|release date|timestamp|                imdb|unknown|Action|Adventure|Animation|Children's|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      1|    Toy Story (1995)| 01-Jan-1995|     NULL|http://us.imdb.co...|      0|     0|        0|        1|         1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|      2|    GoldenEye (1995)| 01-Jan-1995|     NULL|http://us.imdb.co...|      0|  

Join-ujemo tabele kako bismo obezbedili u okviru jedne potrebne informacije za ispunjenje zahteva.

In [206]:
moviesGenre = moviesInfo.join(moviesDF,'movieID', 'inner')
moviesGenre.show()

+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+------+---------+
|movieID|               Title|release date|timestamp|                imdb|unknown|Action|Adventure|Animation|Children's|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|userID|rating|timestamp|
+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+------+---------+
|    242|        Kolya (1996)| 24-Jan-1997|     NULL|http://us.imdb.co...|      0|     0|        0|        0|         0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|   196|     3

Napravljena je posebna funkcija showGenre, u okviru koje smo filmove filtrirali na osnovu toga kom žanru pripadaju. Potop se grupišu na osnovu naziva i žanra, izbroje se pojavljivanja instanci i filmovi predstave u opadajućem redoseldu na osnovu pojavljivanja. Kroz for smo prošli kroz sve žanrove i ispisali rezultate.

In [207]:
from pyspark.sql.functions import col
def showGenre(gen):
  moviesGenre.select('Title', col(gen)).where(col(gen) == 1).groupBy('Title',col(gen))\
  .agg(func.count('Title').alias('count'))\
  .orderBy('count',ascending=False).show(3)

In [208]:
for gen in genres:
  print(gen)
  showGenre(gen)

Crime
+--------------------+-----+-----+
|               Title|Crime|count|
+--------------------+-----+-----+
|        Fargo (1996)|    1|  508|
|Godfather, The (1...|    1|  413|
| Pulp Fiction (1994)|    1|  394|
+--------------------+-----+-----+
only showing top 3 rows

Action
+--------------------+------+-----+
|               Title|Action|count|
+--------------------+------+-----+
|    Star Wars (1977)|     1|  583|
|Return of the Jed...|     1|  507|
|Air Force One (1997)|     1|  431|
+--------------------+------+-----+
only showing top 3 rows

Drama
+--------------------+-----+-----+
|               Title|Drama|count|
+--------------------+-----+-----+
|      Contact (1997)|    1|  509|
|        Fargo (1996)|    1|  508|
|English Patient, ...|    1|  481|
+--------------------+-----+-----+
only showing top 3 rows

Western
+--------------------+-------+-----+
|               Title|Western|count|
+--------------------+-------+-----+
|Dances with Wolve...|      1|  256|
|Butch C

**Kreirati atribut kategorički starosna_grupa na osnovu atributa age  i prikazati 5 najpopularnijih filmova za svaku grupu.**

Za potrebe ovog zahteva odlučili smo se da korisnike podelimo u 5 starosnih grupa:

*   Children (1-12)
*   Teen (13-20)
*   Adult(21-40)
*   Older people(41-60)
*   Old people(older than 60)


To je izvršeno sa withColumn, uz koju je išlo 4 when uslova i 1 otherwise.





In [209]:
moviesAgeGroup = moviesDemo.withColumn("age_group",func.when(moviesDemo["age"] <=12,"Children")\
                                       .when((moviesDemo["age"] >12) & (moviesDemo["age"] <=20),"Teen")\
                                       .when((moviesDemo["age"] >20) & (moviesDemo["age"] <=40),"Adult").\
                                       when((moviesDemo["age"] >40) & (moviesDemo["age"] <=60),"Older people")\
                                       .otherwise("Old people"))

In [210]:
moviesAgeGroup.show(5)

+------+-------+------+---------+--------------------+---+------+----------+------------+
|userID|movieID|rating|timestamp|               Title|age|gender|occupation|   age_group|
+------+-------+------+---------+--------------------+---+------+----------+------------+
|   196|    242|     3|881250949|        Kolya (1996)| 49|     M|    writer|Older people|
|   186|    302|     3|891717742|L.A. Confidential...| 39|     F| executive|       Adult|
|    22|    377|     1|878887116| Heavyweights (1994)| 25|     M|    writer|       Adult|
|   244|     51|     2|880606923|Legends of the Fa...| 28|     M|technician|       Adult|
|   166|    346|     1|886397596| Jackie Brown (1997)| 47|     M|  educator|Older people|
+------+-------+------+---------+--------------------+---+------+----------+------------+
only showing top 5 rows



In [211]:
ageGroup=moviesAgeGroup.select('age_group').distinct();
ageGroup=[row.age_group for row in ageGroup.collect()]
ageGroup

['Teen', 'Older people', 'Adult', 'Old people', 'Children']

Napravljena je procedura showAgeGroup, u okviru koje se za datu starosnu grupu instance grupišu po nazivu filma, a potop izbroji broj instanca za svaki film i filmovi opadajuće predstave.

In [212]:
def showAgeGroup(group):
  moviesAgeGroup.select('Title').where(col('age_group') == group).groupBy('Title')\
  .agg(func.count('Title').alias('count'))\
  .orderBy('count',ascending=False).show(5)

In [213]:
for age in ageGroup:
  print(age)
  showAgeGroup(age)

Teen
+--------------------+-----+
|               Title|count|
+--------------------+-----+
|       Scream (1996)|   90|
|      Contact (1997)|   67|
|    Star Wars (1977)|   66|
|Return of the Jed...|   66|
|    Liar Liar (1997)|   63|
+--------------------+-----+
only showing top 5 rows

Older people
+--------------------+-----+
|               Title|count|
+--------------------+-----+
|English Patient, ...|  173|
|        Fargo (1996)|  139|
|    Star Wars (1977)|  136|
|      Contact (1997)|  128|
|Godfather, The (1...|  128|
+--------------------+-----+
only showing top 5 rows

Adult
+--------------------+-----+
|               Title|count|
+--------------------+-----+
|    Star Wars (1977)|  374|
|Return of the Jed...|  325|
|      Contact (1997)|  309|
|        Fargo (1996)|  309|
|    Liar Liar (1997)|  308|
+--------------------+-----+
only showing top 5 rows

Old people
+--------------------+-----+
|               Title|count|
+--------------------+-----+
|English Patient, ..

**Ukupna prosečna ocena svih filmova.**

Prvo je predstavljena prosečna vrednost ukupne opcene svih filmova.

In [214]:
moviesDemo.agg(func.avg('rating')).show()

+-----------+
|avg(rating)|
+-----------+
|    3.52986|
+-----------+



Predstavljena je vprosečna ocena svakog filma posebno.

In [215]:
moviesDemo.select('Title','rating').groupBy('Title').agg(func.round(func.avg('rating'),2).alias('Avg-Rating')).orderBy('Avg-Rating',ascending=False).show(30)

+--------------------+----------+
|               Title|Avg-Rating|
+--------------------+----------+
|Aiqing wansui (1994)|       5.0|
|Marlene Dietrich:...|       5.0|
|Someone Else's Am...|       5.0|
|Saint of Fort Was...|       5.0|
|     Star Kid (1997)|       5.0|
|Great Day in Harl...|       5.0|
|Entertaining Ange...|       5.0|
|They Made Me a Cr...|       5.0|
|Santa with Muscle...|       5.0|
|  Prefontaine (1997)|       5.0|
|Pather Panchali (...|      4.63|
|         Anna (1996)|       4.5|
|      Everest (1998)|       4.5|
|Some Mother's Son...|       4.5|
|Maya Lin: A Stron...|       4.5|
|Close Shave, A (1...|      4.49|
|Schindler's List ...|      4.47|
|Wrong Trousers, T...|      4.47|
|   Casablanca (1942)|      4.46|
|Wallace & Gromit:...|      4.45|
|Shawshank Redempt...|      4.45|
|Usual Suspects, T...|      4.39|
|  Rear Window (1954)|      4.39|
|    Star Wars (1977)|      4.36|
| 12 Angry Men (1957)|      4.34|
|Bitter Sugar (Azu...|      4.33|
|Third Man, Th

Prosečna ocena prosečnih ocena po filmu.

In [216]:
avg_per_movie=moviesDemo.select('Title','rating').groupBy('Title').agg(func.round(func.avg('rating'),2).alias('Avg-Rating')).orderBy('Avg-Rating',ascending=False)
avg_per_movie.agg(func.mean('Avg-Rating')).show()

+------------------+
|   avg(Avg-Rating)|
+------------------+
|3.0771454326923076|
+------------------+



# **Sistem preporuke baziran na sadržaju**

In [217]:
moviesGenre.show(5)

+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+------+---------+
|movieID|               Title|release date|timestamp|                imdb|unknown|Action|Adventure|Animation|Children's|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|userID|rating|timestamp|
+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+------+------+---------+
|    242|        Kolya (1996)| 24-Jan-1997|     NULL|http://us.imdb.co...|      0|     0|        0|        0|         0|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|   196|     3

In [218]:
moviesDemo.show(5)

+------+-------+------+---------+--------------------+---+------+----------+
|userID|movieID|rating|timestamp|               Title|age|gender|occupation|
+------+-------+------+---------+--------------------+---+------+----------+
|   196|    242|     3|881250949|        Kolya (1996)| 49|     M|    writer|
|   186|    302|     3|891717742|L.A. Confidential...| 39|     F| executive|
|    22|    377|     1|878887116| Heavyweights (1994)| 25|     M|    writer|
|   244|     51|     2|880606923|Legends of the Fa...| 28|     M|technician|
|   166|    346|     1|886397596| Jackie Brown (1997)| 47|     M|  educator|
+------+-------+------+---------+--------------------+---+------+----------+
only showing top 5 rows



In [219]:
moviesInfo.show()

+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movieID|               Title|release date|timestamp|                imdb|unknown|Action|Adventure|Animation|Children's|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+--------------------+------------+---------+--------------------+-------+------+---------+---------+----------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|      1|    Toy Story (1995)| 01-Jan-1995|     NULL|http://us.imdb.co...|      0|     0|        0|        1|         1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|      2|    GoldenEye (1995)| 01-Jan-1995|     NULL|http://us.imdb.co...|      0|  

Na osnovu naziva filma napravljena je nova kolona Year-goina premijere, a to je uradjeno korišćenjem regexp_extract-a(stavljeno je da prepozna da se godina nalazi u zagradama i da sadrži 4 cifre).

In [220]:
from pyspark.sql.functions import regexp_extract
moviesInfo = moviesInfo.withColumn('Year', regexp_extract("Title", r"\((\d{4})\)", 1).cast("int")).select('movieID','Year',*genres)

In [221]:
top50.show()

+-------+-----+--------------------+
|movieID|count|          movieTitle|
+-------+-----+--------------------+
|     50|  583|    Star Wars (1977)|
|    258|  509|      Contact (1997)|
|    100|  508|        Fargo (1996)|
|    181|  507|Return of the Jed...|
|    294|  485|    Liar Liar (1997)|
|    286|  481|English Patient, ...|
|    288|  478|       Scream (1996)|
|      1|  452|    Toy Story (1995)|
|    300|  431|Air Force One (1997)|
|    121|  429|Independence Day ...|
|    174|  420|Raiders of the Lo...|
|    127|  413|Godfather, The (1...|
|     56|  394| Pulp Fiction (1994)|
|      7|  392|Twelve Monkeys (1...|
|     98|  390|Silence of the La...|
|    237|  384|Jerry Maguire (1996)|
|    117|  378|    Rock, The (1996)|
|    172|  367|Empire Strikes Ba...|
|    222|  365|Star Trek: First ...|
|    204|  350|Back to the Futur...|
+-------+-----+--------------------+
only showing top 20 rows



Jedan od predhodnih zahteva bio je da se prikažu top50 filmova, a to je sačuvano u tabeli top50. Kako bismo napravili regresione modela za top50 filmova bazu moviesInfo trebamo da join-ujemo po inner joinu sa top50.

In [222]:
moviesInfoTop50 = moviesInfo.join(top50,'movieID','inner')
moviesInfoTop50 = moviesInfoTop50.drop('count')
moviesInfoTop50.show()
moviesInfoTop50.count()


+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+
|movieID|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|          movieTitle|
+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+
|      1|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|    Toy Story (1995)|
|      7|1995|    0|     0|    1|      0|     0|      0|        0|      0|       0|          0|  0|     1|        0|      0|     0|      0|        0|      0|         0|Twelve Monkeys (1...|
|      9|1995|    0|     0|    1|      0|     0|  

50

In [223]:
moviesDemo.select('movieID').distinct().count()

1682

In [224]:
moviesInfoTop50.select('movieID').distinct().count()

50

In [225]:
print(moviesDemo.count())
moviesDemo.show()

100000
+------+-------+------+---------+--------------------+---+------+-------------+
|userID|movieID|rating|timestamp|               Title|age|gender|   occupation|
+------+-------+------+---------+--------------------+---+------+-------------+
|   196|    242|     3|881250949|        Kolya (1996)| 49|     M|       writer|
|   186|    302|     3|891717742|L.A. Confidential...| 39|     F|    executive|
|    22|    377|     1|878887116| Heavyweights (1994)| 25|     M|       writer|
|   244|     51|     2|880606923|Legends of the Fa...| 28|     M|   technician|
|   166|    346|     1|886397596| Jackie Brown (1997)| 47|     M|     educator|
|   298|    474|     4|884182806|Dr. Strangelove o...| 44|     M|    executive|
|   115|    265|     2|881171488|Hunt for Red Octo...| 31|     M|     engineer|
|   253|    465|     5|891628467|Jungle Book, The ...| 26|     F|    librarian|
|   305|    451|     3|886324817|       Grease (1978)| 23|     M|   programmer|
|     6|     86|     3|883603013|

Kako su za zahtev potrebne i kolone iz moviesDemo(konkretno kolone vezane za informacije o korisniku), spajamo tu tabelu sa predhodno napravljenom.

In [226]:
moviesTrain = moviesInfoTop50.join(moviesDemo.select("movieID","userID","age","gender","occupation"),'movieID','inner')
moviesTrain.count()

17841

In [227]:
moviesTrain.show()

+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+------+---+------+-------------+
|movieID|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|userID|age|gender|   occupation|
+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+------+---+------+-------------+
|      1|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|Toy Story (1995)|   593| 31|     F|     educator|
|      1|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|    

In [228]:
moviesTrain.count()

17841

In [229]:
moviesTrain.select('movieID').distinct().count()

50

Učitavamo nove tabelu u1.base pod nazivom train i u1.test pod nazivom test.

In [230]:
train = spark.read.option('sep','\t').schema(schema).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u1.base')
train.show(5)
train.count()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|     1|      1|     5|874965758|
|     1|      2|     3|876893171|
|     1|      3|     4|878542960|
|     1|      4|     3|876893119|
|     1|      5|     3|889751712|
+------+-------+------+---------+
only showing top 5 rows



80000

In [231]:
test = spark.read.option('sep','\t').schema(schema).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u1.test')
test.show(5)
test.count()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|     1|      6|     5|887431973|
|     1|     10|     3|875693118|
|     1|     12|     5|878542960|
|     1|     14|     5|874965706|
|     1|     17|     3|875073198|
+------+-------+------+---------+
only showing top 5 rows



20000

Sada učitan dataSet train je potrebno join-ovati sa gore napravljenom i izdvojenom tabelom moviesTrain(u okviru koje su informacije o korisnicim ai filmovima sa rating-om, svaka instanca ima složen ključ(moviesID, userID), zato se join-uje po ovom složenom ključu\), Isto se radi i za test skup.

In [232]:
train = train.select('movieID','userID','rating').join(moviesTrain, ['movieID', 'userID'],'inner')
print(train.select('movieID').distinct().count())
train.show()

50
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+
|     28|   913|     3|1995|    0|     1|    1|      0|     0|      0|        0|      0|       1|          0|  0|     0|        0|      0|     0|      0|        0|      0|         0|Apollo 13 (1995)| 27|     M|      student|
|     28|   927|     4|1995|    0|     1|    1|      0|     0|      0|        0|      0|       1|

In [233]:
train.count()

14349

In [234]:
test = test.select('movieID','userID','rating').join(moviesTrain, ['movieID', 'userID'],'inner')
print(test.select('movieID').distinct().count())
test.show()

50
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|occupation|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+
|      1|   339|     5|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|Toy Story (1995)| 35|     M|    lawyer|
|      1|   396|     4|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|

In [235]:
test.count()

3492

In [236]:
train.show()
test.show()

+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+
|      1|   593|     3|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|Toy Story (1995)| 31|     F|     educator|
|      1|   800|     4|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|   

In [237]:
train.select('occupation').distinct().show()

+-------------+
|   occupation|
+-------------+
|    librarian|
|      retired|
|       lawyer|
|         none|
|       writer|
|   programmer|
|    marketing|
|        other|
|    executive|
|    scientist|
|      student|
|     salesman|
|       artist|
|   technician|
|administrator|
|     engineer|
|   healthcare|
|     educator|
|entertainment|
|    homemaker|
+-------------+
only showing top 20 rows



Kako je za potrebe regresionih modela obavezno korišćenje kolona zanimanja i pola, potrebno je napraviti prvo index-ere za iste, a potop i one-hot-encoder za dobijene index-ere. Indexer-ima se svakoj vrednosti zadate kolone dodeljuje neka vrednost(ceo broj, prvi broj je 0), a sa OneHotEncoder-om dobijamo dimenzije vektora, sa pozicijama i njihovim vrednostima.

To je uradjeno za zanimanje i pol.

In [238]:
occ_index = StringIndexer(inputCol= 'occupation' ,outputCol='OccupationIndex')
occ_encoder = OneHotEncoder(inputCol='OccupationIndex',outputCol='OccupationVec')

In [239]:
gender_index = StringIndexer(inputCol= 'gender' ,outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

In [240]:
from pyspark.ml import Pipeline

Radi lakše primene napravljenih funkcija napravljen je pipeline za svaku kolonu.

In [241]:
pipeline_gr = Pipeline(stages=[gender_index,
                           gender_encoder])

In [242]:
pipeline_occ = Pipeline(stages=[occ_index,
                           occ_encoder])

In [243]:
train.select('Year').distinct().count()

21

In [244]:
from pyspark.ml.feature import MinMaxScaler

Kako od potrebnih kolona 2 imaju numericke vrednoste, one us normalizovane MinMaxScaler-om. Preko asemblera pravimo ulazni vektor za regresione modele i ALS. On uzima sve vrednosti ulaznih kolona i daje jedan izlazni vektor.

In [245]:
vector_norm = VectorAssembler(inputCols=['age', 'Year'], outputCol='norm')
scaler = MinMaxScaler(inputCol='norm', outputCol='norm_features')
pipeline_norm = Pipeline(stages=[vector_norm, scaler])
assembler = VectorAssembler(inputCols=[*genres,
 'SexVec',
 'OccupationVec',
 'norm_features'],
 outputCol='features')

Glavni pipeline sadrzi sve procedure pripreme podataka, bitno je naglasiti da se on uci na train skupu, a fituje na testu.

In [246]:
pipeline_main=Pipeline(stages=[pipeline_gr, pipeline_occ, pipeline_norm, assembler])
pipelineMain=pipeline_main.fit(train)
tr=pipelineMain.transform(train)
ts=pipelineMain.transform(test)
tr.show()

+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------

In [247]:
ts.show()

+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|

In [248]:
from pyspark.ml.regression import LinearRegression

Promena imena kolone iz rating u label.

In [249]:
tr = tr.withColumnRenamed("rating", "label")
ts = ts.withColumnRenamed("rating", "label")
tr.show()

+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|movieID|userID|label|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+-----------------

Učenje defaultnog modela Linerane regresije na trening skupu.

In [250]:
lr = LinearRegression(labelCol='label', featuresCol='features')
model_lr=lr.fit(tr)

Evaluacija na trening skupu podataka.

In [251]:
test_results=model_lr.evaluate(tr)
print(test_results.rootMeanSquaredError)

0.9972216737514644


Primena i evaluacija preko RootMeanSquareErrora na test podacima.

In [252]:
predictions = model_lr.transform(ts)
predictions.show()
test_eval= model_lr.evaluate(ts)

+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+------------------+
|movieID|userID|label|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|        prediction|
+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+---------

In [253]:
print("RMSE bez optimizacije: {}".format(test_results.rootMeanSquaredError))
print("MSE bez optimizacije: {}".format(test_results.meanSquaredError))

RMSE bez optimizacije: 0.9972216737514644
MSE bez optimizacije: 0.9944510665996721


Optimizacija linearneRegresije, konkretno parametara: regParam(predstavlja koeficijent regularizacije, regularizacija se koristi kako bi se sprečilo preprilagođavanje modela tako što se kazni kompleksnost modela) i elasticNetParam(0- lasso(retki parametri teze 0), 0.5- kombinacija obe, 1- ridge)

In [254]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(labelCol='label', maxIter=10)
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse")

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(tr)
best_lr=cvModel.bestModel

predictions = best_lr.transform(tr)

rmse = evaluator.evaluate(predictions)
print('Za trening:')
print('---------------------------------------------------------------------------------')
print(rmse)
print("Best Model Hyperparameters:")
print("regParam: ", cvModel.bestModel._java_obj.getRegParam())
print("elasticNetParam: ", cvModel.bestModel._java_obj.getElasticNetParam())

Za trening:
---------------------------------------------------------------------------------
0.9972881666709932
Best Model Hyperparameters:
regParam:  0.01
elasticNetParam:  0.0


Najbolji model je primenjen i na testu. Na osnovu rezultata testa i treninga možemo da zaključimo da model na osnovu naše metrike nije overfit-ovao.

In [255]:
predictions = best_lr.transform(ts)
print('Za test:')

rmse = evaluator.evaluate(predictions)
print('---------------------------------------------------------------------------------')
print(rmse)
print("Best Model Hyperparameters:")
print("regParam: ", cvModel.bestModel._java_obj.getRegParam())
print("elasticNetParam: ", cvModel.bestModel._java_obj.getElasticNetParam())

Za test:
---------------------------------------------------------------------------------
1.0034522106438586
Best Model Hyperparameters:
regParam:  0.01
elasticNetParam:  0.0


In [256]:
from pyspark.ml.regression import RandomForestRegressor

Instanciran je i naučen default-an model ansambla RandomForest.

In [257]:
rf = RandomForestRegressor(labelCol='label', featuresCol='features')
model_rf=rf.fit(tr)

Defaultni model je primenjen na test i dobijeni su rezultati, rezultsti su slični kao i predhodni.

In [258]:
evaluator_rmse = RegressionEvaluator(metricName="rmse")
evaluator_mse = RegressionEvaluator(metricName="mse")

predictions = model_rf.transform(ts)
predictions.show()
test_eval_rmse= evaluator_rmse.evaluate(predictions)
test_eval_mse= evaluator_mse.evaluate(predictions)
print("RMSE bez optimizacije: {}".format(test_eval_rmse))
print("MSE bez optimizacije: {}".format(test_eval_mse))

+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+------------------+
|movieID|userID|label|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|        prediction|
+-------+------+-----+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+---------------+-------------+---------

Izvršena je optimizacija ansambla, optimizovani su: numTrees(broj stabala za predikcije) i maxDepth(maksimalna dubina stabla).

In [260]:
rf = RandomForestRegressor(labelCol="label", featuresCol="features")

param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [10, 15]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse")

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(tr)
best_rf=cvModel.bestModel

predictions = best_rf.transform(tr)

rmse = evaluator.evaluate(predictions)
print('---------------------------------------------------------------------------------')
print(rmse)
print("Best number of trees:", best_rf.getNumTrees)

---------------------------------------------------------------------------------
0.8673043710847383
Best number of trees: 20


In [None]:
predictions = best_rf.transform(ts)
print('Za test:')

rmse = evaluator.evaluate(predictions)
print('---------------------------------------------------------------------------------')
print(rmse)
print("Best number of trees:", best_rf.getNumTrees)

**Sistem preporuke baziran na kolaborativnom filtriranju**

In [262]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import isnan

Naredno je odradjen zahtev vezan za kolaborativno filtriranje, izvršena je optimizacija za ALS model. Konkretno: rank(ovaj parametar određuje koliko dimenzija će imati latentni prostor koji se koristi za predstavljanje korisnika i stavki u modelu, veći "rank" obično omogućava modelu da nauči složenije obrasce u podacima, ali može dovesti do veće složenosti modela i produženog vremena obučavanja), regParam( parametar regularizacije koji kontroliše jačinu regularizacije u modelu ALS, koristi se kako bi se sprečilo prenaučavanje (overfitting) modela na trening skupu podataka), maxIter.

In [263]:
als = ALS(userCol="userID", itemCol="movieID", ratingCol="label")
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .addGrid(als.maxIter, [5, 10]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(tr)
bestModel_ALS = cvModel.bestModel

predictions = bestModel_ALS.transform(tr)
predictions = predictions.filter(~isnan(col("prediction")))

rmse = evaluator.evaluate(predictions)
print("RMSE: " + str(rmse))

RMSE: 0.5203673024686022


In [264]:
predictions = bestModel_ALS.transform(ts)
predictions = predictions.filter(~isnan(col("prediction")))

rmse = evaluator.evaluate(predictions)
print("RMSE: " + str(rmse))

RMSE: 1.2913454725831839


- Korišćenjem u2.base identifikujte korisnike koji su dali manje od 5 ocena ili se ne pojavljuju u u1.base i napravite dve promenljive: users_with_grades (korisnici iz u2.base koji se pojavljuju u u1.base i imaju minimum 5 ocena) i users_without_grades (korisnici iz u2.base koji se ne pojavljuju u u1.base ili imaju manje od 5 ocena)). Takođe, odvojiti filmove u dve različite promenljive: movies_u2_only – filmovi koji se ne nalaze u u1.base a nalaze se u u2_base i movies_u_12 – filmovi koji se nalaze u obe taabele.
- Napraviti proceduru koja će deliti korisnike iz u2.base na osnovu prethodnog opisa i koja će za users_with_grades previđati na osnovu ALS-a, a za users_without_grades predviđati na osnovu regresionog modela iz prethodne stavke. Na izlazu procedure treba da postoje predikcije za sve korisnike.

Učitavanje nove baze u2.base

In [265]:
train_2 = spark.read.option('sep','\t').schema(schema).csv('/content/drive/MyDrive/BIGDATA/Data/ml-100k/u2.base')
train_2.show()

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|     1|      3|     4|878542960|
|     1|      4|     3|876893119|
|     1|      5|     3|889751712|
|     1|      6|     5|887431973|
|     1|      7|     4|875071561|
|     1|     10|     3|875693118|
|     1|     11|     2|875072262|
|     1|     12|     5|878542960|
|     1|     13|     5|875071805|
|     1|     14|     5|874965706|
|     1|     15|     5|875071608|
|     1|     16|     5|878543541|
|     1|     17|     3|875073198|
|     1|     18|     4|887432020|
|     1|     19|     5|875071515|
|     1|     20|     4|887431883|
|     1|     23|     4|875072895|
|     1|     24|     3|875071713|
|     1|     25|     4|875071805|
|     1|     27|     2|876892946|
+------+-------+------+---------+
only showing top 20 rows



Spajanje sa tabelom moviesTrain, po složenom kljuc3u [user, movie]

In [266]:
train_2 = train_2.select('movieID','userID','rating').join(moviesTrain, ['movieID', 'userID'],'inner')
print(train_2.select('movieID').distinct().count())
train_2=pipelineMain.transform(train_2)
train_2.show()

50
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+-----------

In [267]:
train_2.select('userID').distinct().count()

940

prvo ostavljamo samo ID-jeve user-a koji su dali više od 5 ocena, a potop gledamo dal su se ti useri pojavili i u u1.base tabeli.

In [268]:
users_with_grades = train_2.groupBy("userID").count().where("count >= 5").select("userID").distinct()
users_with_grades=users_with_grades.join(train, "userID", 'inner').select('userID').distinct()
users_with_grades.count()

845

In [269]:
from pyspark.sql.functions import asc
users_with_grades.select('userID').orderBy(asc('userID')).show()

+------+
|userID|
+------+
|     1|
|     2|
|     5|
|     6|
|     7|
|     8|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    20|
|    21|
|    22|
|    23|
|    24|
+------+
only showing top 20 rows



Gore dobijene userID-jeve inner join-ujemo sa u2.base, kako bi izdvojili redove sa dobijenim user-ima.

In [270]:
data=users_with_grades.join(train_2, 'userID', 'inner')
data.show()
data.select('userID').distinct().orderBy(asc('userID')).show()

+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+----------+--------+-------------+---------------+--------------+-------------+--------------------+--------------------+
|userID|movieID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|          movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex| OccupationVec|         norm|       norm_features|            features|
+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+----------+--------+-------------+---------------+--------------+-------------+--------------------+--------------

In [271]:
data.count()

14059

Sada izdvajamo userId-jeve gde se user ne pojavljuje u u1.base, preko left-anti join-a.

In [272]:
users_without_grades= train_2.join(train, on="userID", how="left_anti").select("userID").distinct()
users_without_grades.show()
users_without_grades.count()

+------+
|userID|
+------+
|   341|
|   172|
|   208|
+------+



3

In [273]:
train_2.groupby('userID').count().filter(col("count")<5).select('userID').count()

94

pravimo uniju gore dobijenih vrednosti sa userID-jevima gde je user ocenio manje od 5 filmova.

In [274]:
users_without_grades=users_without_grades.union(train_2.groupby('userID').count().filter(col("count")<5).select('userID').distinct())
users_without_grades=users_without_grades.select('userID').distinct()
users_without_grades.count()

95

Izdvajamo iz u2.base redove koji isounjavaju gore zadate uslove(preko izdvojenih userID-je)

In [276]:
data2=users_without_grades.join(train_2, 'userID', 'inner')
data2.show()

+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|userID|movieID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|          movieTitle|age|gender|   occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--

In [277]:
data2.count()

291

Gore napisane linije koda za radvajanje redova na users_with_grades i users)without_grades ubacujemo u proceduru, a potom izvršavamo na svakom podskupu jedan model i spajamo rezultate preko union.

In [278]:
def predict_ratings(data, data2):
  users_with_grades = data2.groupBy("userID").count().where("count >= 5").select("userID").distinct()
  users_with_grades=users_with_grades.join(data, "userID", 'inner').select('userID').distinct()
  users_with_grades.count()
  users_with_grades=users_with_grades.join(data2, 'userID', 'inner')
  print('users with grades count:')
  print(users_with_grades.count())


  users_without_grades= data2.join(data, on="userID", how="left_anti").select("userID").distinct()
  users_without_grades=users_without_grades.union(data2.groupby('userID').count().filter(col("count")<5).select('userID').distinct())
  users_without_grades=users_without_grades.select('userID').distinct()
  users_without_grades=users_without_grades.join(data2, 'userID', 'inner')
  print('users without grades count:')
  print(users_without_grades.count())

  predictions_with_grades = bestModel_ALS.transform(users_with_grades)

  predictions_without_grades = best_rf.transform(users_without_grades)

  all_predictions = predictions_with_grades.union(predictions_without_grades)

  return all_predictions

In [279]:
predictions=predict_ratings(train, train_2)
predictions.show()

users with grades count:
14059
users without grades count:
291
+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+----------+--------+-------------+---------------+--------------+-------------+--------------------+--------------------+------------------+
|userID|movieID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|          movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex| OccupationVec|         norm|       norm_features|            features|        prediction|
+------+-------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+--------------------+---+------+----------+--

In [280]:
predictions.count()

14350

In [281]:
predictions = predictions.withColumnRenamed("rating", "label")
predictions = predictions.filter(~isnan(col("prediction")))
rmse = evaluator.evaluate(predictions)
print("RMSE: " + str(rmse))

RMSE: 0.7881426971421129


Izdvajanje filmova koji se samo nalaze u u2.base

In [282]:
movies_u2_only = train_2.join(train, 'movieID', 'left_anti')
movies_u2_only.show()

+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|   occupation|SexIndex|       SexVec|OccupationIndex|  OccupationVec|         norm|       norm_features|            features|
+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+--------+-------------+---------------+---------------+-------------+--------------------+--------------

Izdvajanje filmova koji se nalaze i u u1 i u2.base.

In [283]:
movies_u_12 = train_2.join(train, on='movieID', how='inner')
movies_u_12.show()

+-------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+----------+--------+-------------+---------------+--------------+-------------+--------------------+--------------------+------+------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+----------------+---+------+-------------+
|movieID|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|      movieTitle|age|gender|occupation|SexIndex|       SexVec|OccupationIndex| OccupationVec|         norm|       norm_features|            features|userID|rating|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|M

In [284]:
best_user= train_2.groupBy('userID').count().orderBy(col("count").desc()).limit(1)

In [285]:
best_user=best_user.drop('count')
best_user.show()

+------+
|userID|
+------+
|   655|
+------+



In [286]:
demographics.show()

+------+---+------+-------------+
|userID|age|gender|   occupation|
+------+---+------+-------------+
|     1| 24|     M|   technician|
|     2| 53|     F|        other|
|     3| 23|     M|       writer|
|     4| 24|     M|   technician|
|     5| 33|     F|        other|
|     6| 42|     M|    executive|
|     7| 57|     M|administrator|
|     8| 36|     M|administrator|
|     9| 29|     M|      student|
|    10| 53|     M|       lawyer|
|    11| 39|     F|        other|
|    12| 28|     F|        other|
|    13| 47|     M|     educator|
|    14| 45|     M|    scientist|
|    15| 49|     F|     educator|
|    16| 21|     M|entertainment|
|    17| 30|     M|   programmer|
|    18| 35|     F|        other|
|    19| 40|     M|    librarian|
|    20| 42|     F|    homemaker|
+------+---+------+-------------+
only showing top 20 rows



In [287]:
best_user= best_user.join(demographics, 'userID', 'inner')
best_user.show()

+------+---+------+----------+
|userID|age|gender|occupation|
+------+---+------+----------+
|   655| 50|     F|healthcare|
+------+---+------+----------+



In [288]:
moviesInfo.show()

+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+
|movieID|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|
+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+
|      1|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|
|      2|1995|    0|     1|    0|      0|     0|      0|        0|      0|       1|          0|  0|     0|        0|      0|     0|      0|        1|      0|         0|
|      3|1995|    0|     0|    0|      0|     0|      0|        0|      0|       1|          0|  0|     0|        0|      0|     0|      0|        0|      

In [289]:
moviesInfo.count()

1682

In [290]:
best_user_rec = best_user.crossJoin(moviesInfo)
best_user_rec.count()

1682

In [291]:
from pyspark.sql.functions import lit
best_user_rec=best_user_rec.withColumn('rating', lit(0))
best_user_rec.show()

+------+---+------+----------+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+------+
|userID|age|gender|occupation|movieID|Year|Crime|Action|Drama|Western|Horror|unknown|Animation|Mystery|Thriller|Documentary|War|Sci-Fi|Film-Noir|Musical|Comedy|Romance|Adventure|Fantasy|Children's|rating|
+------+---+------+----------+-------+----+-----+------+-----+-------+------+-------+---------+-------+--------+-----------+---+------+---------+-------+------+-------+---------+-------+----------+------+
|   655| 50|     F|healthcare|      1|1995|    0|     0|    0|      0|     0|      0|        1|      0|       0|          0|  0|     0|        0|      0|     1|      0|        0|      0|         1|     0|
|   655| 50|     F|healthcare|      2|1995|    0|     1|    0|      0|     0|      0|        0|      0|       1|          0|  0|     0|        0|      0|     0|      0|        1|  

In [292]:
best_user_rec= pipelineMain.transform(best_user_rec)

In [293]:
best_user_rec.count()

1682

In [309]:
performances_best=predict_ratings(train, best_user_rec)
performances_best = performances_best.filter(~isnan(col("prediction")))

users with grades count:
1682
users without grades count:
0


In [310]:
ranked_recomendations = performances_best.orderBy(col("prediction").desc()).select(['movieID','prediction'])

In [311]:
ranked_recomendations = ranked_recomendations.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))
ranked_recomendations.show()

+-------+------------------+--------------------+
|movieID|        prediction|          movieTitle|
+-------+------------------+--------------------+
|     98| 4.096625328063965|Silence of the La...|
|    127| 4.074453353881836|Godfather, The (1...|
|    276| 3.845383644104004|Leaving Las Vegas...|
|    100|3.7586288452148438|        Fargo (1996)|
|    269|  3.75205659866333|Full Monty, The (...|
|     64|3.7048089504241943|Shawshank Redempt...|
|    173|3.6784071922302246|Princess Bride, T...|
|    183| 3.598633050918579|        Alien (1979)|
|    318| 3.577965497970581|Schindler's List ...|
|    191|3.4788103103637695|      Amadeus (1984)|
|     50|3.4640660285949707|    Star Wars (1977)|
|    216|3.3883326053619385|When Harry Met Sa...|
|    174| 3.376204490661621|Raiders of the Lo...|
|    313|3.3737378120422363|      Titanic (1997)|
|    172|3.3534553050994873|Empire Strikes Ba...|
|      7| 3.332129716873169|Twelve Monkeys (1...|
|     79| 3.235292911529541|Fugitive, The (1993)|
