# Question 4

In [1]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
import itertools
## Explore all spark types
#from pyspark.sql.types import StructField, StructType, ByteType, DoubleType, DecimalType, StringType, ArrayType, IntegerType

In [2]:
#sc=SparkContext()
#spark=SparkSession(sc)

# Read actor_movies.txt and actress_movies.txt, filter out actors and actresses with less than 10 movies
actor_context=sc.textFile("actor_movies.txt")
actor_movies=actor_context.map(lambda x:x.split("\t\t")).filter(lambda x:len(x)>10).map(lambda x:[x[0],x[1:]])
actress_context=sc.textFile("actress_movies.txt")
actress_movies=actress_context.map(lambda x:x.split("\t\t")).filter(lambda x:len(x)>10).map(lambda x:[x[0],x[1:]])
name_movie=actor_movies.union(actress_movies)
# flatmap all movies correspond to each name, get an one name to one movie object
name1_movie1 = name_movie.flatMap(lambda (name,movie):[[name.strip(),movie[i].strip()] for i in range(len(movie))])

In [3]:
# transform name1_movie1 to spark dataframe name1_movie1_df to perform groupby operation
name1_movie1_scheme = StructType([
     StructField('Name', StringType(), True),
     StructField('Movie', StringType(), True)
    ])
name1_movie1_df = name1_movie1.toDF(schema=name1_movie1_scheme)

In [4]:
# after groupby, get one movie to all names correspondence
name_movie1_df = name1_movie1_df.groupBy("Movie").agg(F.collect_list("Name").alias("Name"))

In [5]:
name_movie1_df.take(1)

[Row(Movie=u'#28 (2002)', Name=[u'Fitzpatrick, Greg (I)', u'Niemi, Laura'])]

In [6]:
# filter out movies with less than 10 actors and actresses
movie1_name_df = name_movie1_df.filter(F.size("Name")> 9 ) 
movie1_name_df.count()

98749

In [7]:
# count how many actors and actresses each movie has
movie_count_df = movie1_name_df.withColumn("Count", F.size("Name")).select(["Movie","Count"])

In [8]:
movie_count_df.show(5)

+--------------------+-----+
|               Movie|Count|
+--------------------+-----+
| 10 Attitudes (2001)|   12|
|101 Reykjav�k (2000)|   11|
|12 Years a Slave ...|   20|
|22 Jump Street (2...|   22|
|45 Fathers (1937)...|   29|
+--------------------+-----+
only showing top 5 rows



In [11]:
# transform movie1_name_df to rdd format in order to flatmap
movie1_name = movie1_name_df.rdd
movie1_name.take(3)

[Row(Movie=u'10 Attitudes (2001)', Name=[u'Armas, Adrian', u'Bullock, Jim J.', u'Crowley, Ben (I)', u'Faustino, David', u'Fehsenfeld, Danny', u'Hara, Mitch', u'Kanan, Sean', u'Stuart, Jason (I)', u'Swann, Garrett', u'Vilanch, Bruce', u'Paul, Alexandra (III)', u'Tenuta, Judy']),
 Row(Movie=u'101 Reykjav\ufffdk (2000)', Name=[u'Einarsson, P\ufffdtur (I)', u'Eyj\ufffdlfsson, Gunnar', u'Gunnarsson, \ufffdr\ufffdstur Le\ufffd', u'Gu\ufffdnason, Hilmir Sn\ufffdr', u'Jonsson, Hilmar', u'Korm\ufffdkur, Baltasar', u'Sigur\ufffdarson, J\ufffdhann', u'\ufffdlafsson, \ufffdlafur Darri', u'Abril, Victoria', u'Backman, Edda Heidr\ufffdn', u'Karlsd\ufffdttir, Hanna Mar\ufffda']),
 Row(Movie=u'12 Years a Slave (2013)  (uncredited)', Name=[u'Arthur, Jon (II)', u'Braud, Sean Paul', u'Causin, Joseph Randy', u'Clare, Edward J.', u'Joyce, Mark (IV)', u'Klein, John C.', u'LeBlanc, Elton', u"Lewis, Gerard 'Jerry'", u'Montgomery, Ritchie', u'Parsons, Shawn', u'P\ufffdre, Wayne', u'Smith, Chaz', u'Tureaud, Tre

In [12]:
# after this flatmap, we get one movie to one name correspondence. Now this new RDD is difference than previous 
# name1_movie1 because we have filtered out some movies will less than 10 actors and actresses
movie1_name1_2 = movie1_name.flatMap(lambda (movie,name):[[movie.strip(),name[i].strip()] for i in range(len(name))])

In [14]:
# transform to dataframe to groupby name
name1_movie1_2_scheme = StructType([
     StructField('Movie', StringType(), True),
     StructField('Name', StringType(), True)
    ])
movie1_name1_2_df = movie1_name1_2.toDF(schema=name1_movie1_2_scheme)
name1_movie_2_df = movie1_name1_2_df.groupBy("Name").agg(F.collect_list("Movie").alias("Movie"))

In [15]:
name1_movie_2_df.show(3)

+--------------------+--------------------+
|                Name|               Movie|
+--------------------+--------------------+
|      Aabel, Per (I)|[Et sp�kelse fore...|
|     Abascal, Margot|[On a tr�s peu d'...|
|Abramson, Nicole (I)|[Valentine's Day ...|
+--------------------+--------------------+
only showing top 3 rows



In [16]:
# transfrom back to RDD to perform flatMap, get one name to all movies objects. 
# Then for each name, find movie pairs
# Finally, for each movie pair, find number of common actors and actresses
name1_movie_2 = name1_movie_2_df.rdd
movie_pairs = name1_movie_2.flatMap(lambda (name,movie): itertools.combinations(movie,2) )
movie_pairs = movie_pairs.map(lambda x:(x,1))
movie_pairs = movie_pairs.reduceByKey(lambda a,b : a+b)
movie_pairs = movie_pairs.map(lambda ((m1,m2),count):(m1,m2,count))

In [17]:
movie_pairs.take(3)

[(u'Mein Herz darfst du nicht fragen (1952)',
  u'Nach Regen scheint Sonne (1949)',
  1),
 (u'Kichamani M.B.A. (2007)', u'Unnikrishnante Adyathe Christmas (1989)', 1),
 (u"I megali stigmi tou '21: Papaflessas (1971)",
  u'Antes llega la muerte (1964)',
  1)]

In [18]:
# transform it to dataframe to perform join action
movie_pairs_scheme = StructType([
     StructField('Movie1', StringType(), True),
     StructField('Movie2', StringType(), True),
     StructField('IntersectCount', IntegerType(), True)
    ])
movie_pairs_df = movie_pairs.toDF(schema=movie_pairs_scheme)

In [19]:
# label all movies with unique id
movie_count_df=movie_count_df.withColumn("id",F.monotonically_increasing_id())

In [20]:
movie_count_df.count()

98749

In [198]:
movie_count_df.show(5)

+--------------------+-----+---+
|               Movie|Count| id|
+--------------------+-----+---+
|'G' Men (1935)  (...|   45|  0|
|  12 stulev (1971)  |   18|  1|
|       1408 (2007)  |   20|  2|
|22 Jump Street (2...|   44|  3|
|3's a Couple (201...|   12|  4|
+--------------------+-----+---+
only showing top 5 rows



In [21]:
movie_count_df.printSchema()

root
 |-- Movie: string (nullable = true)
 |-- Count: integer (nullable = false)
 |-- id: long (nullable = false)



In [22]:
movie_count_df.write.csv("movie_count")

In [23]:
os.system("cat /Users/AlChen/Desktop/EE232E_proj2/project_2_data/movie_count.csv/p* > /Users/AlChen/Desktop/EE232E_proj2/project_2_data/movie_count.csv/test.csv")

0

In [24]:
movie_pairs_df.count()

29766038

In [25]:
# join movie_count_df twice to movie_pairs_df
cond1=[movie_pairs_df.Movie1==movie_count_df.Movie]
pairs_1 = movie_pairs_df.join(movie_count_df,on=cond1,how="inner")
pairs_1=pairs_1.withColumnRenamed("Count","Count1").withColumnRenamed("id","id1")
pairs_1 = pairs_1.drop("Movie")
cond2=[pairs_1.Movie2==movie_count_df.Movie]
pairs_2 = pairs_1.join(movie_count_df,on=cond2,how="inner")
pairs = pairs_2.withColumnRenamed("Count","Count2").withColumnRenamed("id","id2").drop("Movie")

In [161]:
pairs_1.show(5)

+--------------------+--------------------+--------------+--------------------+-----+
|              Movie1|              Movie2|IntersectCount|               Movie|Count|
+--------------------+--------------------+--------------+--------------------+-----+
|A k�sz�v� ember f...|A tettes ismeretl...|             2|A k�sz�v� ember f...|   24|
|A k�sz�v� ember f...|   Visszaes�k (1983)|             2|A k�sz�v� ember f...|   24|
|A k�sz�v� ember f...|L�nyarcok t�k�rbe...|             1|A k�sz�v� ember f...|   24|
|A k�sz�v� ember f...|Gyalog a mennyors...|             8|A k�sz�v� ember f...|   24|
|A k�sz�v� ember f...|Mad�ch: Egy ember...|             1|A k�sz�v� ember f...|   24|
+--------------------+--------------------+--------------+--------------------+-----+
only showing top 5 rows



In [26]:
# calculate Jaccard Index
pairs_weight = pairs.withColumn("Weight",F.col("IntersectCount") / (F.col("Count1")+F.col("Count2")-F.col("IntersectCount")) )

In [27]:
pairs_weight.printSchema()

root
 |-- Movie1: string (nullable = true)
 |-- Movie2: string (nullable = true)
 |-- IntersectCount: integer (nullable = true)
 |-- Count1: integer (nullable = false)
 |-- id1: long (nullable = false)
 |-- Count2: integer (nullable = false)
 |-- id2: long (nullable = false)
 |-- Weight: double (nullable = true)



In [28]:
pairs_weight = pairs_weight.select("id1","id2","Weight")

In [29]:
pairs_weight.write.csv("movie_pairs",sep=" ")

In [30]:
os.system("cat /Users/AlChen/Desktop/EE232E_proj2/project_2_data/movie_pairs/p* > /Users/AlChen/Desktop/EE232E_proj2/project_2_data/movie_pairs/movie_pairs.csv")

0