In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
import json
import requests

In [2]:
sqlcontext = SQLContext(sc)



In [3]:
imdb = sqlContext.read.csv('imdb.csv', header = True)

In [4]:
imdb.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- genres: string (nullable = true)



In [5]:
imdb.show(5)

+---+---------+---------+--------------------+---------+--------------------+
|_c0|   tconst|titleType|        primaryTitle|startYear|              genres|
+---+---------+---------+--------------------+---------+--------------------+
|  8|tt0000009|    movie|          Miss Jerry|     1894|             Romance|
|498|tt0000502|    movie|            Bohemios|     1905|                  \N|
|570|tt0000574|    movie|The Story of the ...|     1906|Action,Adventure,...|
|587|tt0000591|    movie|    The Prodigal Son|     1907|               Drama|
|610|tt0000615|    movie|  Robbery Under Arms|     1907|               Drama|
+---+---------+---------+--------------------+---------+--------------------+
only showing top 5 rows



In [6]:
movielens = sqlContext.read.csv('movielens.csv', header = True)

In [7]:
movielens.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)



In [8]:
critic_ratings = sqlContext.read.csv('ratings.csv', header = True)

In [9]:
critic_ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [10]:
user_ratings = sqlContext.read.csv('data.tsv', header = True, sep = '\t')

In [11]:
user_ratings.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)



In [12]:
imdb.registerTempTable("imdb")
movielens.registerTempTable("movielens")
critic_ratings.registerTempTable("critic_ratings")
user_ratings.registerTempTable("user_ratings")



In [13]:
sql1 = sqlcontext.sql(''' select CAST(SUBSTRING(tconst, 3, 9) as INT) as id,  primaryTitle, startYear, movieId, tag from imdb 
                          INNER JOIN movielens
                          on CAST(SUBSTRING(tconst, 3, 9) as INT) = CAST(imdbId as INT)''')

In [14]:
sql1.show(5)

+----+------------------+---------+-------+-------------+
|  id|      primaryTitle|startYear|movieId|          tag|
+----+------------------+---------+-------+-------------+
|4181|Judith of Bethulia|     1914|  84852|       attack|
|4181|Judith of Bethulia|     1914|  84852|        bible|
|4181|Judith of Bethulia|     1914|  84852|     disguise|
|4181|Judith of Bethulia|     1914|  84852|        siege|
|4181|Judith of Bethulia|     1914|  84852|D.W. Griffith|
+----+------------------+---------+-------+-------------+
only showing top 5 rows



In [15]:
sql1.registerTempTable("sql1")

In [16]:
val = sql1.toJSON()

##### Keywords from Numbers.com and TMDB

racial : racism, racial, discrimination, segregation, racist, civil

addiction : drug, drugs, abuse, depression, addiction, depressing

rigths: gay, queer, lgbt, lesbian 

social commentary

empowerment: sexist, strong female lead, strong woman, strong women

immigration

In [17]:
#1-	Whether the number of socially aware movies have increased across the years with more awareness?

In [48]:
def get_social_value(data):
    tokens = {'racial' : ['racism', 'racial', 'discrimination', 'segregation', 'racist', 'civil', 'black', 'african', 'race'],
              'addiction' : ['drug', 'drugs', 'abuse', 'depression', 'addiction', 'depressing', 'suicide', 'psychiatrist',
                            'mental', 'addict', 'abandonment', 'alcoholism'],
              'rights': ['gay', 'queer', 'lgbt', 'lesbian', 'bisexual', 'homosexuality', 'transgender'],
              'social': ['social', 'social commentary', 'social criticism','societal', 'bully', 'bullying', 'pollution', 'corruption', 'crisis'],
              'empowerment': ['sexist', 'strong female lead', 'strong woman', 'strong women', 'woman', 'women'],
              'immigration': ['immigration', 'culture']}
    
    year = data['startYear']
    id1 = data['id']
    tag = data.get('tag')
    val = None
    
    for i, j in tokens.items():
        for k in j:
            if(tag != None):
                if(k in tag.lower()):
                    val = 1
                else:
                    continue
    if(val == None):
        return (id1, (year, 0))
    else:
        return(id1, (year, 1))

In [49]:
def sum_tuple(vals):
    sum1,social = 0, 0
    for i in range(len(vals)):
        j = i % 2
        if(j == 0):
            year = vals[i]
        else:
            sum1 = sum1 + vals[i] 
            if(sum1 >= 1):
                social = 1
            else:
                social = 0
    return (year, social)

In [50]:
def tot_count(vals):
    sum1, social = 0, 0
    for i in range(len(vals)):
        j = i % 2
        if(j == 0):
            year = vals[i]
    return (year, 1)

In [51]:
solution =  val.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value(x))\
            .reduceByKey(lambda a, b : a + b)\
            .mapValues(sum_tuple)\
            .map(lambda x: x[1])\
            .reduceByKey(lambda a, b : a + b)\
            .sortBy(lambda x: -x[1])

In [52]:
counts =  val.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value(x))\
            .reduceByKey(lambda a, b : a + b)\
            .mapValues(tot_count)\
            .map(lambda x: x[1])\
            .reduceByKey(lambda a, b : a + b)

In [53]:
percentage = solution.join(counts).map(lambda x: (x[0], x[1][0]/x[1][1]))\
             .sortBy(lambda x: -x[1])\
             .filter(lambda x: x[1] < 1)\
             .map(lambda x: x[0] + ',' + str(x[1]))

In [54]:
percentage.collect()

['2009,0.3883029721955896',
 '1998,0.3767258382642998',
 '2005,0.3761348897535668',
 '2004,0.37586685159500693',
 '2013,0.37384615384615383',
 '2012,0.3724832214765101',
 '2006,0.37058152793614596',
 '2000,0.36764705882352944',
 '2001,0.3676222596964587',
 '2007,0.3625541125541126',
 '2008,0.3575757575757576',
 '1997,0.35247524752475246',
 '1996,0.3521739130434783',
 '2010,0.3508424182358771',
 '2011,0.3494525547445255',
 '2002,0.3438485804416404',
 '2014,0.3433696348494555',
 '2016,0.34335337341349365',
 '1992,0.34097421203438394',
 '2003,0.33983739837398375',
 '2015,0.33420879842416285',
 '1915,0.3333333333333333',
 '1995,0.330188679245283',
 '1999,0.3203125',
 '1993,0.3048128342245989',
 '1994,0.28741092636579574',
 '1990,0.28055555555555556',
 '1986,0.28',
 '1991,0.27945205479452057',
 '1984,0.2782874617737003',
 '2017,0.2767102229054573',
 '2019,0.2631578947368421',
 '2018,0.2630272952853598',
 '1973,0.2545931758530184',
 '1983,0.2516778523489933',
 '1975,0.25157232704402516',
 '1

In [55]:
#2 Do audiences prefer certain themes of movies over others. Are there certain themes that audience do not want to see on the big screen?

sql2 = sqlcontext.sql(''' select id, primaryTitle, startYear, a.movieId, tag, (a.user_rating - b.critic) as diff from
                          (
                          (select id, primaryTitle, startYear, movieId, tag, u.averageRating as user_rating from sql1 
                          INNER JOIN user_ratings u
                          on id = CAST(SUBSTRING(tconst, 3, 9) as INT)) a
                          JOIN (select movieId, ((AVG(rating)/5) * 10) as critic from critic_ratings 
                          GROUP BY movieId) b
                          ON a.movieId = b. movieId)
                          ''')

In [56]:
sql2.show()

+-------+--------------------+---------+-------+-----------------+--------------------+
|     id|        primaryTitle|startYear|movieId|              tag|                diff|
+-------+--------------------+---------+-------+-----------------+--------------------+
|1606384|              My Way|     2011| 100062|     world war ii|  0.5024390243902435|
|1606384|              My Way|     2011| 100062|     press-ganged|  0.5024390243902435|
|1192624|  Punching the Clown|     2009| 100070|         comedian| 0.12307692307692353|
|1192624|  Punching the Clown|     2009| 100070|           comedy| 0.12307692307692353|
|1192624|  Punching the Clown|     2009| 100070|      good humour| 0.12307692307692353|
|1192624|  Punching the Clown|     2009| 100070|         comedian| 0.12307692307692353|
|1192624|  Punching the Clown|     2009| 100070|struggling career| 0.12307692307692353|
|2153963|                Tabu|     2012| 100277|      silent film| 0.23877551020408116|
|2153963|                Tabu|  

In [57]:
tab2 = sql2.toJSON()

In [58]:
def get_social_value1(data):
    tokens = {'racial' : ['racism', 'racial', 'discrimination', 'segregation', 'racist', 'civil', 'black', 'african', 'race'],
              'addiction' : ['drug', 'drugs', 'abuse', 'depression', 'addiction', 'depressing', 'suicide', 'psychiatrist',
                            'mental', 'addict', 'abandonment', 'alcoholism'],
              'rights': ['gay', 'queer', 'lgbt', 'lesbian', 'bisexual', 'homosexuality', 'transgender'],
              'social': ['social', 'social commentary', 'social criticism','societal', 'bully', 'bullying', 'pollution', 'corruption', 'crisis'],
              'empowerment': ['sexist', 'strong female lead', 'strong woman', 'strong women', 'woman', 'women'],
              'immigration': ['immigration', 'culture']}

    year = data['startYear']
    id1 = data['id']
    tag = data.get('tag')
    diff = data.get('diff')
    val = None
    
    for i, j in tokens.items():
        for k in j:
            if(tag != None):
                if(k in tag.lower()):
                    val = i
                else:
                    continue
    if(val == None):
        return (id1, (val, diff))
    else:
        return(id1, (val, diff))

In [59]:
def sum_tuple1(vals):
    sum1, good = 0, 0
    for i in range(len(vals)):
        j = i % 2
        if(j == 0):
            id1 = vals[i]
        else:
            if(id1 != None):
                sum1 = sum1 + 1
                if(sum1 >= 1):
                    if(vals[i] > 1):
                        good = 1
                else:
                    good = 0
            else:
                good = 0
    return (id1, good)

In [60]:
def counts1(vals):
    sum1, good = 0, 0
    for i in range(len(vals)):
        j = i % 2
        if(j == 0):
            id1 = vals[i]
        else:
            if(id1 != None):
                sum1 = sum1 + 1
                if(sum1 >= 1):
                    good = 1
                else:
                    good = 0
            else:
                good = 0
    return (id1, good)

In [61]:
solution =  tab2.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value1(x))\
            .reduceByKey(lambda a, b : a + b)\
             .mapValues(sum_tuple1)\
             .map(lambda x: x[1])\
             .reduceByKey(lambda a, b : a + b)\
            .sortBy(lambda x: -x[1])

In [62]:
solution.collect()

[('empowerment', 256),
 ('rights', 31),
 ('addiction', 20),
 ('social', 19),
 ('racial', 14),
 ('immigration', 1),
 (None, 0)]

In [63]:
counts =  tab2.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value1(x))\
            .reduceByKey(lambda a, b : a + b)\
             .mapValues(counts1)\
              .map(lambda x: x[1])\
              .reduceByKey(lambda a, b : a + b)\
              .filter(lambda x: x[1] > 1)\
              .sortBy(lambda x: -x[1])

In [64]:
counts.collect()

[('empowerment', 1570),
 ('rights', 306),
 ('addiction', 256),
 ('racial', 173),
 ('social', 149),
 ('immigration', 28)]

In [65]:
percent = solution.join(counts).map(lambda x: (x[0], x[1][0]/x[1][1]))\
             .sortBy(lambda x: -x[1])

In [66]:
percent.collect()

[('empowerment', 0.16305732484076432),
 ('social', 0.12751677852348994),
 ('rights', 0.10130718954248366),
 ('racial', 0.08092485549132948),
 ('addiction', 0.078125),
 ('immigration', 0.03571428571428571)]

In [67]:
#3 Do socially aware movies get low budgets and box office collections?

In [68]:
tmdb = sqlContext.read.csv('tmdb.csv', header = True)

In [69]:
tmdb.show()

+---+-------+------+----------+---------+
|_c0|imdb_id|    id|   revenue|   budget|
+---+-------+------+----------+---------+
|  0|  76759|    11| 775398007| 11000000|
|  1| 118715|   115|  46969409| 15000000|
|  2|  62512|   667| 111584787|  9500000|
|  3| 425210|   186|  56308881| 27000000|
|  4|2872718|242582|  50300000|  8500000|
|  5| 470752|264660|  36869414| 15000000|
|  6|2332623|197599|         0|        0|
|  7|3783958|313369| 447407695| 30000000|
|  8|5052448|419430| 255407969|  4500000|
|  9|  63522|   805|  33395426|  3200000|
| 10|  87544|    81|   3301446|  1000000|
| 11|  86190|  1892| 572700000| 32350000|
| 12| 106582|  9350| 255000211| 70000000|
| 13| 146882|   243|  47126295| 30000000|
| 14|7475540|489987|         0|        0|
| 15|  96754|  2756|  90000098| 70000000|
| 16| 181852|   296| 435000000|200000000|
| 17| 499549| 19995|2920357254|237000000|
| 18|1375666| 27205| 825532764|160000000|
| 19|1408101| 54138| 467365246|190000000|
+---+-------+------+----------+---

In [70]:
tmdb.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- budget: string (nullable = true)



In [71]:
tmdb.registerTempTable("tmdb")

In [72]:
sql3 = sqlcontext.sql(''' select imdb_id, tag, revenue, budget from tmdb
                          INNER JOIN movielens
                          on imdb_id = imdbId''')

In [73]:
sql3.show()

+-------+---------+---------+--------+
|imdb_id|      tag|  revenue|  budget|
+-------+---------+---------+--------+
|  76759|  classic|775398007|11000000|
|  76759|  classic|775398007|11000000|
|  76759|  classic|775398007|11000000|
|  76759|  classic|775398007|11000000|
|  76759|  classic|775398007|11000000|
|  76759|  classic|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   sci-fi|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|   action|775398007|11000000|
|  76759|adventure|775398007|11000000|
|  76759|adventure|775398007|11000000|
+-------+---------+---------+--------+
only showing top 20 rows



In [74]:
tab3 = sql3.toJSON()

In [75]:
def get_social_value2(data):
    tokens = {'racial' : ['racism', 'racial', 'discrimination', 'segregation', 'racist', 'civil', 'black', 'african', 'race'],
              'addiction' : ['drug', 'drugs', 'abuse', 'depression', 'addiction', 'depressing', 'suicide', 'psychiatrist',
                            'mental', 'addict', 'abandonment', 'alcoholism'],
              'rights': ['gay', 'queer', 'lgbt', 'lesbian', 'bisexual', 'homosexuality', 'transgender'],
              'social': ['social', 'social commentary', 'social criticism','societal', 'bully', 'bullying', 'pollution', 'corruption', 'crisis'],
              'empowerment': ['sexist', 'strong female lead', 'strong woman', 'strong women', 'woman', 'women'],
              'immigration': ['immigration', 'culture']}

    id1 = data['imdb_id']
    tag = data.get('tag')
    revenue = data.get('revenue')
    budget = data.get('budget')
    
    val = None
    
    for i, j in tokens.items():
        for k in j:
            if(tag != None):
                if(k in tag.lower()):
                    val = i
                else:
                    continue
    if(val == None):
        return (id1, (val, budget, revenue))
    else:
        return(id1, (val, budget, revenue))

In [76]:
def sum_tuple2(vals):
    sum1, good = 0, 0
    for i in range(len(vals)):
        j = i % 3
        if(j == 0):
            id1 = vals[i]
        elif(j == 1):
            if(id1 != None):
                sum1 = sum1 + 1
                if(sum1 >= 1):
                    if(int(vals[i]) <= 2000000):
                        budget = vals[i]
                        good = 1
                else:
                    good = 0
            else:
                good = 0
        elif(id1 != None):
            sum1 = sum1 + 1
            if(sum1 >= 1):
                if(int(vals[i]) <= 20000000):
                    good = 1
                else:
                    good = 0
            else:
                good = 0
    return (id1, good)

In [77]:
solution3 =  tab3.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value2(x))\
             .reduceByKey(lambda a, b : a + b)\
              .mapValues(sum_tuple2)\
              .map(lambda x: x[1])\
              .reduceByKey(lambda a, b : a + b)\
             .sortBy(lambda x: -x[1])

In [78]:
solution3.collect()

[('empowerment', 1757),
 ('rights', 302),
 ('addiction', 253),
 ('racial', 176),
 ('social', 140),
 ('immigration', 26),
 (None, 0)]

In [87]:
def counts2(vals):
    sum1, good = 0, 0
    for i in range(len(vals)):
        j = i % 3
        if(j == 0):
            id1 = vals[i]
        elif(j == 1):
            if(id1 != None):
                sum1 = sum1 + 1
                if(sum1 >= 1):
                    good = 1
                else:
                    good = 0
            else:
                good = 0
    return (id1, good)

In [88]:
counts =  tab3.map(lambda line: json.loads(line))\
            .map(lambda x: get_social_value2(x))\
            .reduceByKey(lambda a, b : a + b)\
             .mapValues(counts2)\
              .map(lambda x: x[1])\
              .reduceByKey(lambda a, b : a + b)\
              .filter(lambda x: x[1] > 1)\
              .sortBy(lambda x: -x[1])

In [89]:
counts.collect()

[('empowerment', 1784),
 ('rights', 309),
 ('addiction', 272),
 ('racial', 194),
 ('social', 155),
 ('immigration', 29)]

In [93]:
percent3 = solution3.join(counts).map(lambda x: (x[0], x[1][0]/x[1][1]))\
             .sortBy(lambda x: -x[1])\
             .map(lambda x: x[0] + ',' + str(x[1]))

In [94]:
percent3.collect()

['empowerment,0.9848654708520179',
 'rights,0.9773462783171522',
 'addiction,0.9301470588235294',
 'racial,0.9072164948453608',
 'social,0.9032258064516129',
 'immigration,0.896551724137931']

In [98]:
!cat anuraagr_si618_proj_output4/part* > anuraagr_proj_output5.csv